use std::{ffi::c_void, time::Duration};
use crate::{
PCWSTR, WString,
mem::{BoxPool, GetRaw, GetRawWithBoxPool},
runtime::executor::BoxedCancelToken,
types::Uri,
};
use mssf_com::{
FabricClient::{IFabricResolvedServicePartitionResult, IFabricServiceManagementClient8},
FabricTypes::{
FABRIC_DELETE_SERVICE_DESCRIPTION, FABRIC_PARTITION_KEY_TYPE,
FABRIC_PARTITION_KEY_TYPE_INT64, FABRIC_PARTITION_KEY_TYPE_INVALID,
FABRIC_PARTITION_KEY_TYPE_NONE, FABRIC_PARTITION_KEY_TYPE_STRING,
FABRIC_REMOVE_REPLICA_DESCRIPTION, FABRIC_RESOLVED_SERVICE_ENDPOINT,
FABRIC_RESTART_REPLICA_DESCRIPTION, FABRIC_SERVICE_DESCRIPTION,
FABRIC_SERVICE_ENDPOINT_ROLE, FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
FABRIC_SERVICE_PARTITION_KIND, FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
FABRIC_SERVICE_PARTITION_KIND_INVALID, FABRIC_SERVICE_PARTITION_KIND_NAMED,
FABRIC_SERVICE_PARTITION_KIND_SINGLETON, FABRIC_SERVICE_ROLE_INVALID,
FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY,
FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY, FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY,
FABRIC_SERVICE_ROLE_STATELESS, FABRIC_SERVICE_UPDATE_DESCRIPTION, FABRIC_URI,
},
};
use crate::sync::{FabricReceiver, fabric_begin_end_proxy};
use crate::types::{
RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
};
#[derive(Debug, Clone)]
pub struct ServiceManagementClient {
com: IFabricServiceManagementClient8,
}
impl ServiceManagementClient {
pub fn get_com(&self) -> IFabricServiceManagementClient8 {
self.com.clone()
}
}
impl ServiceManagementClient {
fn resolve_service_partition_internal(
&self,
name: FABRIC_URI,
partition_key_type: FABRIC_PARTITION_KEY_TYPE,
partition_key: *const ::core::ffi::c_void,
previous_result: Option<&IFabricResolvedServicePartitionResult>, timeout_milliseconds: u32,
cancellation_token: Option<BoxedCancelToken>,
) -> FabricReceiver<crate::Result<IFabricResolvedServicePartitionResult>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginResolveServicePartition(
name,
partition_key_type,
partition_key,
previous_result,
timeout_milliseconds,
callback,
)
},
move |ctx| unsafe { com2.EndResolveServicePartition(ctx) },
cancellation_token,
)
}
fn restart_replica_internal(
&self,
desc: &FABRIC_RESTART_REPLICA_DESCRIPTION,
timeout_milliseconds: u32,
cancellation_token: Option<BoxedCancelToken>,
) -> FabricReceiver<crate::Result<()>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginRestartReplica(desc, timeout_milliseconds, callback)
},
move |ctx| unsafe { com2.EndRestartReplica(ctx) },
cancellation_token,
)
}
fn remove_replica_internal(
&self,
desc: &FABRIC_REMOVE_REPLICA_DESCRIPTION,
timeout_milliseconds: u32,
cancellation_token: Option<BoxedCancelToken>,
) -> FabricReceiver<crate::Result<()>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginRemoveReplica(desc, timeout_milliseconds, callback)
},
move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
cancellation_token,
)
}
fn register_service_notification_filter_internal(
&self,
desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
timeout_milliseconds: u32,
cancellation_token: Option<BoxedCancelToken>,
) -> FabricReceiver<crate::Result<i64>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
},
move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
cancellation_token,
)
}
fn unregister_service_notification_filter_internal(
&self,
filterid: i64,
timeout_milliseconds: u32,
cancellation_token: Option<BoxedCancelToken>,
) -> FabricReceiver<crate::Result<()>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginUnregisterServiceNotificationFilter(
filterid,
timeout_milliseconds,
callback,
)
},
move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
cancellation_token,
)
}
fn create_service_internal(
&self,
desc: &FABRIC_SERVICE_DESCRIPTION,
timeout_milliseconds: u32,
cancellation_token: Option<BoxedCancelToken>,
) -> FabricReceiver<crate::Result<()>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginCreateService(desc, timeout_milliseconds, callback)
},
move |ctx| unsafe { com2.EndCreateService(ctx) },
cancellation_token,
)
}
fn update_service_internal(
&self,
name: FABRIC_URI,
desc: &FABRIC_SERVICE_UPDATE_DESCRIPTION,
timeout_milliseconds: u32,
cancellation_token: Option<BoxedCancelToken>,
) -> FabricReceiver<crate::Result<()>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginUpdateService(name, desc, timeout_milliseconds, callback)
},
move |ctx| unsafe { com2.EndUpdateService(ctx) },
cancellation_token,
)
}
fn delete_service_internal(
&self,
name: FABRIC_URI,
timeout_milliseconds: u32,
cancellation_token: Option<BoxedCancelToken>,
) -> FabricReceiver<crate::Result<()>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginDeleteService(name, timeout_milliseconds, callback)
},
move |ctx| unsafe { com2.EndDeleteService(ctx) },
cancellation_token,
)
}
fn delete_service2_internal(
&self,
desc: &FABRIC_DELETE_SERVICE_DESCRIPTION,
timeout_milliseconds: u32,
cancellation_token: Option<BoxedCancelToken>,
) -> FabricReceiver<crate::Result<()>> {
let com1 = &self.com;
let com2 = self.com.clone();
fabric_begin_end_proxy(
move |callback| unsafe {
com1.BeginDeleteService2(desc, timeout_milliseconds, callback)
},
move |ctx| unsafe { com2.EndDeleteService2(ctx) },
cancellation_token,
)
}
}
impl From<IFabricServiceManagementClient8> for ServiceManagementClient {
fn from(com: IFabricServiceManagementClient8) -> Self {
Self { com }
}
}
impl From<ServiceManagementClient> for IFabricServiceManagementClient8 {
fn from(value: ServiceManagementClient) -> Self {
value.com
}
}
impl ServiceManagementClient {
pub async fn resolve_service_partition(
&self,
name: &Uri,
key_type: &PartitionKeyType,
prev: Option<&ResolvedServicePartition>,
timeout: Duration,
cancellation_token: Option<BoxedCancelToken>,
) -> crate::Result<ResolvedServicePartition> {
let com = {
let uri = name.as_raw();
let prev_opt = prev.map(|x| &x.com);
let (key_type, key) = key_type.as_raw_parts();
self.resolve_service_partition_internal(
uri,
key_type,
key,
prev_opt,
timeout.as_millis().try_into().unwrap(),
cancellation_token,
)
}
.await??;
let res = ResolvedServicePartition::from(com);
Ok(res)
}
pub async fn restart_replica(
&self,
desc: &RestartReplicaDescription,
timeout: Duration,
cancellation_token: Option<BoxedCancelToken>,
) -> crate::Result<()> {
{
let raw: FABRIC_RESTART_REPLICA_DESCRIPTION = desc.into();
self.restart_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
}
.await?
}
pub async fn remove_replica(
&self,
desc: &RemoveReplicaDescription,
timeout: Duration,
cancellation_token: Option<BoxedCancelToken>,
) -> crate::Result<()> {
{
let raw: FABRIC_REMOVE_REPLICA_DESCRIPTION = desc.into();
self.remove_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
}
.await?
}
pub async fn register_service_notification_filter(
&self,
desc: &ServiceNotificationFilterDescription,
timeout: Duration,
cancellation_token: Option<BoxedCancelToken>,
) -> crate::Result<FilterIdHandle> {
let id = {
let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
self.register_service_notification_filter_internal(
&raw,
timeout.as_millis() as u32,
cancellation_token,
)
}
.await??;
Ok(FilterIdHandle { id })
}
pub async fn unregister_service_notification_filter(
&self,
filter_id_handle: FilterIdHandle,
timeout: Duration,
cancellation_token: Option<BoxedCancelToken>,
) -> crate::Result<()> {
self.unregister_service_notification_filter_internal(
filter_id_handle.id,
timeout.as_millis() as u32,
cancellation_token,
)
.await?
}
pub async fn create_service(
&self,
desc: &crate::types::ServiceDescription,
timeout: Duration,
cancellation_token: Option<BoxedCancelToken>,
) -> crate::Result<()> {
{
let mut pool = BoxPool::new();
let ffi_raw = desc.get_raw_with_pool(&mut pool);
self.create_service_internal(&ffi_raw, timeout.as_millis() as u32, cancellation_token)
}
.await?
}
pub async fn update_service(
&self,
name: &Uri,
desc: &crate::types::ServiceUpdateDescription,
timeout: Duration,
cancellation_token: Option<BoxedCancelToken>,
) -> crate::Result<()> {
{
let mut pool = BoxPool::new();
let ffi_raw = desc.get_raw_with_pool(&mut pool);
self.update_service_internal(
name.as_raw(),
&ffi_raw,
timeout.as_millis() as u32,
cancellation_token,
)
}
.await?
}
#[deprecated(note = "Use `delete_service2` instead, which supports force-delete.")]
pub async fn delete_service(
&self,
name: &Uri,
timeout: Duration,
cancellation_token: Option<BoxedCancelToken>,
) -> crate::Result<()> {
self.delete_service_internal(
name.as_raw(),
timeout.as_millis() as u32,
cancellation_token,
)
.await?
}
pub async fn delete_service2(
&self,
desc: &crate::types::DeleteServiceDescription,
timeout: Duration,
cancellation_token: Option<BoxedCancelToken>,
) -> crate::Result<()> {
{
let raw = desc.get_raw();
self.delete_service2_internal(&raw, timeout.as_millis() as u32, cancellation_token)
}
.await?
}
}
#[derive(Debug, PartialEq)]
pub struct FilterIdHandle {
pub(crate) id: i64,
}
#[derive(Debug, PartialEq, Clone)]
pub enum PartitionKeyType {
Int64(i64),
Invalid,
None,
String(WString),
}
impl PartitionKeyType {
unsafe fn from_raw_parts(svc: ServicePartitionKind, data: *const c_void) -> PartitionKeyType {
match svc {
ServicePartitionKind::Int64Range => {
let x = unsafe { (data as *mut i64).as_ref().unwrap() };
PartitionKeyType::Int64(*x)
}
ServicePartitionKind::Invalid => PartitionKeyType::Invalid,
ServicePartitionKind::Singleton => PartitionKeyType::None,
ServicePartitionKind::Named => {
let x = data as *mut u16;
assert!(!x.is_null());
let s = WString::from(PCWSTR::from_raw(x));
PartitionKeyType::String(s)
}
}
}
}
impl PartitionKeyType {
fn as_raw_parts(&self) -> (FABRIC_PARTITION_KEY_TYPE, *const c_void) {
match self {
PartitionKeyType::Int64(x) => (
FABRIC_PARTITION_KEY_TYPE_INT64,
x as *const i64 as *const c_void,
),
PartitionKeyType::Invalid => (FABRIC_PARTITION_KEY_TYPE_INVALID, std::ptr::null()),
PartitionKeyType::None => (FABRIC_PARTITION_KEY_TYPE_NONE, std::ptr::null()),
PartitionKeyType::String(x) => (
FABRIC_PARTITION_KEY_TYPE_STRING,
x.as_pcwstr().as_ptr() as *const c_void,
),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ServicePartitionKind {
Int64Range,
Invalid,
Named,
Singleton,
}
impl From<&ServicePartitionKind> for FABRIC_SERVICE_PARTITION_KIND {
fn from(value: &ServicePartitionKind) -> Self {
match value {
ServicePartitionKind::Int64Range => FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
ServicePartitionKind::Invalid => FABRIC_SERVICE_PARTITION_KIND_INVALID,
ServicePartitionKind::Named => FABRIC_SERVICE_PARTITION_KIND_NAMED,
ServicePartitionKind::Singleton => FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
}
}
}
impl From<FABRIC_SERVICE_PARTITION_KIND> for ServicePartitionKind {
fn from(value: FABRIC_SERVICE_PARTITION_KIND) -> Self {
match value {
FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE => ServicePartitionKind::Int64Range,
FABRIC_SERVICE_PARTITION_KIND_INVALID => ServicePartitionKind::Invalid,
FABRIC_SERVICE_PARTITION_KIND_NAMED => ServicePartitionKind::Named,
FABRIC_SERVICE_PARTITION_KIND_SINGLETON => ServicePartitionKind::Singleton,
_ => {
if cfg!(debug_assertions) {
panic!("unknown type: {value:?}");
} else {
ServicePartitionKind::Invalid
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct ResolvedServicePartition {
com: IFabricResolvedServicePartitionResult,
pub service_name: Uri,
pub service_partition_kind: ServicePartitionKind,
pub partition_key_type: PartitionKeyType,
pub endpoints: Vec<ResolvedServiceEndpoint>,
}
impl From<IFabricResolvedServicePartitionResult> for ResolvedServicePartition {
fn from(com: IFabricResolvedServicePartitionResult) -> Self {
let raw = unsafe { com.get_Partition().as_ref().unwrap() };
let service_name = Uri::from(raw.ServiceName);
let kind_raw = raw.Info.Kind;
let val = raw.Info.Value;
let service_partition_kind: ServicePartitionKind = kind_raw.into();
let partition_key_type =
unsafe { PartitionKeyType::from_raw_parts(service_partition_kind, val) };
let endpoints = crate::iter::vec_from_raw_com(raw.EndpointCount as usize, raw.Endpoints);
Self {
com,
service_name,
service_partition_kind,
partition_key_type,
endpoints,
}
}
}
impl ResolvedServicePartition {
pub fn compare_version(&self, other: &ResolvedServicePartition) -> crate::Result<i32> {
unsafe { self.com.CompareVersion(&other.com) }.map_err(crate::Error::from)
}
}
impl PartialEq for ResolvedServicePartition {
fn eq(&self, other: &Self) -> bool {
match self.compare_version(other) {
Ok(i) => i == 0,
Err(_) => false, }
}
}
impl PartialOrd for ResolvedServicePartition {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
match self.compare_version(other) {
Ok(i) => Some(i.cmp(&0)),
Err(_) => None,
}
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ServiceEndpointRole {
Invalid,
StatefulPrimary,
StatefulPrimaryAuxiliary,
StatefulSecondary,
StatefulAuxiliary,
Stateless,
}
impl From<FABRIC_SERVICE_ENDPOINT_ROLE> for ServiceEndpointRole {
fn from(value: FABRIC_SERVICE_ENDPOINT_ROLE) -> Self {
match value {
FABRIC_SERVICE_ROLE_INVALID => ServiceEndpointRole::Invalid,
FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY => ServiceEndpointRole::StatefulPrimary,
FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY => {
ServiceEndpointRole::StatefulPrimaryAuxiliary
}
FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY => ServiceEndpointRole::StatefulSecondary,
FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY => ServiceEndpointRole::StatefulAuxiliary,
FABRIC_SERVICE_ROLE_STATELESS => ServiceEndpointRole::Stateless,
_ => {
if cfg!(debug_assertions) {
panic!("unknown type: {value:?}");
} else {
ServiceEndpointRole::Invalid
}
}
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ResolvedServiceEndpoint {
pub address: WString,
pub role: ServiceEndpointRole,
}
impl From<&FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEndpoint {
fn from(value: &FABRIC_RESOLVED_SERVICE_ENDPOINT) -> Self {
let raw = value;
Self {
address: WString::from(raw.Address),
role: raw.Role.into(),
}
}
}
#[cfg(test)]
mod tests {
use crate::{PCWSTR, WString};
use super::{PartitionKeyType, ServicePartitionKind};
#[test]
fn test_conversion_int() {
let k = PartitionKeyType::Int64(99);
let (key_type, raw) = k.as_raw_parts();
assert_eq!(
key_type,
mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_INT64
);
let i = unsafe { (raw as *const i64).as_ref().unwrap() };
assert_eq!(*i, 99);
let service_type = ServicePartitionKind::Int64Range;
let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
assert_eq!(k, k2);
}
#[test]
fn test_conversion_string() {
let src = WString::from("mystr");
let k = PartitionKeyType::String(src.clone());
let (key_type, raw) = k.as_raw_parts();
assert_eq!(
key_type,
mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_STRING
);
let s = WString::from(PCWSTR::from_raw(raw as *const u16));
assert_eq!(s, src);
let service_type = ServicePartitionKind::Named;
let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
assert_eq!(k, k2);
}
}