1use std::{ffi::c_void, time::Duration};
7
8use crate::{
9 PCWSTR, WString,
10 mem::{BoxPool, GetRaw, GetRawWithBoxPool},
11 runtime::executor::BoxedCancelToken,
12 types::Uri,
13};
14use mssf_com::{
15 FabricClient::{IFabricResolvedServicePartitionResult, IFabricServiceManagementClient8},
16 FabricTypes::{
17 FABRIC_DELETE_SERVICE_DESCRIPTION, FABRIC_PARTITION_KEY_TYPE,
18 FABRIC_PARTITION_KEY_TYPE_INT64, FABRIC_PARTITION_KEY_TYPE_INVALID,
19 FABRIC_PARTITION_KEY_TYPE_NONE, FABRIC_PARTITION_KEY_TYPE_STRING,
20 FABRIC_REMOVE_REPLICA_DESCRIPTION, FABRIC_RESOLVED_SERVICE_ENDPOINT,
21 FABRIC_RESTART_REPLICA_DESCRIPTION, FABRIC_SERVICE_DESCRIPTION,
22 FABRIC_SERVICE_ENDPOINT_ROLE, FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
23 FABRIC_SERVICE_PARTITION_KIND, FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
24 FABRIC_SERVICE_PARTITION_KIND_INVALID, FABRIC_SERVICE_PARTITION_KIND_NAMED,
25 FABRIC_SERVICE_PARTITION_KIND_SINGLETON, FABRIC_SERVICE_ROLE_INVALID,
26 FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY,
27 FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY, FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY,
28 FABRIC_SERVICE_ROLE_STATELESS, FABRIC_SERVICE_UPDATE_DESCRIPTION, FABRIC_URI,
29 },
30};
31
32use crate::sync::{FabricReceiver, fabric_begin_end_proxy};
33
34use crate::types::{
35 RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
36};
37
38#[derive(Debug, Clone)]
40pub struct ServiceManagementClient {
41 com: IFabricServiceManagementClient8,
42}
43
44impl ServiceManagementClient {
45 pub fn get_com(&self) -> IFabricServiceManagementClient8 {
46 self.com.clone()
47 }
48}
49impl ServiceManagementClient {
52 fn resolve_service_partition_internal(
53 &self,
54 name: FABRIC_URI,
55 partition_key_type: FABRIC_PARTITION_KEY_TYPE,
56 partition_key: *const ::core::ffi::c_void,
57 previous_result: Option<&IFabricResolvedServicePartitionResult>, timeout_milliseconds: u32,
59 cancellation_token: Option<BoxedCancelToken>,
60 ) -> FabricReceiver<crate::Result<IFabricResolvedServicePartitionResult>> {
61 let com1 = &self.com;
62 let com2 = self.com.clone();
63 fabric_begin_end_proxy(
64 move |callback| unsafe {
65 com1.BeginResolveServicePartition(
66 name,
67 partition_key_type,
68 partition_key,
69 previous_result,
70 timeout_milliseconds,
71 callback,
72 )
73 },
74 move |ctx| unsafe { com2.EndResolveServicePartition(ctx) },
75 cancellation_token,
76 )
77 }
78
79 fn restart_replica_internal(
80 &self,
81 desc: &FABRIC_RESTART_REPLICA_DESCRIPTION,
82 timeout_milliseconds: u32,
83 cancellation_token: Option<BoxedCancelToken>,
84 ) -> FabricReceiver<crate::Result<()>> {
85 let com1 = &self.com;
86 let com2 = self.com.clone();
87 fabric_begin_end_proxy(
88 move |callback| unsafe {
89 com1.BeginRestartReplica(desc, timeout_milliseconds, callback)
90 },
91 move |ctx| unsafe { com2.EndRestartReplica(ctx) },
92 cancellation_token,
93 )
94 }
95
96 fn remove_replica_internal(
97 &self,
98 desc: &FABRIC_REMOVE_REPLICA_DESCRIPTION,
99 timeout_milliseconds: u32,
100 cancellation_token: Option<BoxedCancelToken>,
101 ) -> FabricReceiver<crate::Result<()>> {
102 let com1 = &self.com;
103 let com2 = self.com.clone();
104 fabric_begin_end_proxy(
105 move |callback| unsafe {
106 com1.BeginRemoveReplica(desc, timeout_milliseconds, callback)
107 },
108 move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
109 cancellation_token,
110 )
111 }
112
113 fn register_service_notification_filter_internal(
114 &self,
115 desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
116 timeout_milliseconds: u32,
117 cancellation_token: Option<BoxedCancelToken>,
118 ) -> FabricReceiver<crate::Result<i64>> {
119 let com1 = &self.com;
120 let com2 = self.com.clone();
121 fabric_begin_end_proxy(
122 move |callback| unsafe {
123 com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
124 },
125 move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
126 cancellation_token,
127 )
128 }
129
130 fn unregister_service_notification_filter_internal(
131 &self,
132 filterid: i64,
133 timeout_milliseconds: u32,
134 cancellation_token: Option<BoxedCancelToken>,
135 ) -> FabricReceiver<crate::Result<()>> {
136 let com1 = &self.com;
137 let com2 = self.com.clone();
138 fabric_begin_end_proxy(
139 move |callback| unsafe {
140 com1.BeginUnregisterServiceNotificationFilter(
141 filterid,
142 timeout_milliseconds,
143 callback,
144 )
145 },
146 move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
147 cancellation_token,
148 )
149 }
150
151 fn create_service_internal(
152 &self,
153 desc: &FABRIC_SERVICE_DESCRIPTION,
154 timeout_milliseconds: u32,
155 cancellation_token: Option<BoxedCancelToken>,
156 ) -> FabricReceiver<crate::Result<()>> {
157 let com1 = &self.com;
158 let com2 = self.com.clone();
159 fabric_begin_end_proxy(
160 move |callback| unsafe {
161 com1.BeginCreateService(desc, timeout_milliseconds, callback)
162 },
163 move |ctx| unsafe { com2.EndCreateService(ctx) },
164 cancellation_token,
165 )
166 }
167
168 fn update_service_internal(
169 &self,
170 name: FABRIC_URI,
171 desc: &FABRIC_SERVICE_UPDATE_DESCRIPTION,
172 timeout_milliseconds: u32,
173 cancellation_token: Option<BoxedCancelToken>,
174 ) -> FabricReceiver<crate::Result<()>> {
175 let com1 = &self.com;
176 let com2 = self.com.clone();
177 fabric_begin_end_proxy(
178 move |callback| unsafe {
179 com1.BeginUpdateService(name, desc, timeout_milliseconds, callback)
180 },
181 move |ctx| unsafe { com2.EndUpdateService(ctx) },
182 cancellation_token,
183 )
184 }
185
186 fn delete_service_internal(
187 &self,
188 name: FABRIC_URI,
189 timeout_milliseconds: u32,
190 cancellation_token: Option<BoxedCancelToken>,
191 ) -> FabricReceiver<crate::Result<()>> {
192 let com1 = &self.com;
193 let com2 = self.com.clone();
194 fabric_begin_end_proxy(
195 move |callback| unsafe {
196 com1.BeginDeleteService(name, timeout_milliseconds, callback)
197 },
198 move |ctx| unsafe { com2.EndDeleteService(ctx) },
199 cancellation_token,
200 )
201 }
202
203 fn delete_service2_internal(
204 &self,
205 desc: &FABRIC_DELETE_SERVICE_DESCRIPTION,
206 timeout_milliseconds: u32,
207 cancellation_token: Option<BoxedCancelToken>,
208 ) -> FabricReceiver<crate::Result<()>> {
209 let com1 = &self.com;
210 let com2 = self.com.clone();
211 fabric_begin_end_proxy(
212 move |callback| unsafe {
213 com1.BeginDeleteService2(desc, timeout_milliseconds, callback)
214 },
215 move |ctx| unsafe { com2.EndDeleteService2(ctx) },
216 cancellation_token,
217 )
218 }
219}
220
221impl From<IFabricServiceManagementClient8> for ServiceManagementClient {
222 fn from(com: IFabricServiceManagementClient8) -> Self {
223 Self { com }
224 }
225}
226
227impl From<ServiceManagementClient> for IFabricServiceManagementClient8 {
228 fn from(value: ServiceManagementClient) -> Self {
229 value.com
230 }
231}
232
233impl ServiceManagementClient {
236 pub async fn resolve_service_partition(
238 &self,
239 name: &Uri,
240 key_type: &PartitionKeyType,
241 prev: Option<&ResolvedServicePartition>,
242 timeout: Duration,
243 cancellation_token: Option<BoxedCancelToken>,
244 ) -> crate::Result<ResolvedServicePartition> {
245 let com = {
246 let uri = name.as_raw();
247 let prev_opt = prev.map(|x| &x.com);
249
250 let (key_type, key) = key_type.as_raw_parts();
251
252 self.resolve_service_partition_internal(
253 uri,
254 key_type,
255 key,
256 prev_opt,
257 timeout.as_millis().try_into().unwrap(),
258 cancellation_token,
259 )
260 }
261 .await??;
262 let res = ResolvedServicePartition::from(com);
263 Ok(res)
264 }
265
266 pub async fn restart_replica(
271 &self,
272 desc: &RestartReplicaDescription,
273 timeout: Duration,
274 cancellation_token: Option<BoxedCancelToken>,
275 ) -> crate::Result<()> {
276 {
277 let raw: FABRIC_RESTART_REPLICA_DESCRIPTION = desc.into();
278 self.restart_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
279 }
280 .await?
281 }
282
283 pub async fn remove_replica(
289 &self,
290 desc: &RemoveReplicaDescription,
291 timeout: Duration,
292 cancellation_token: Option<BoxedCancelToken>,
293 ) -> crate::Result<()> {
294 {
295 let raw: FABRIC_REMOVE_REPLICA_DESCRIPTION = desc.into();
296 self.remove_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
297 }
298 .await?
299 }
300
301 pub async fn register_service_notification_filter(
316 &self,
317 desc: &ServiceNotificationFilterDescription,
318 timeout: Duration,
319 cancellation_token: Option<BoxedCancelToken>,
320 ) -> crate::Result<FilterIdHandle> {
321 let id = {
322 let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
323 self.register_service_notification_filter_internal(
324 &raw,
325 timeout.as_millis() as u32,
326 cancellation_token,
327 )
328 }
329 .await??;
330 Ok(FilterIdHandle { id })
331 }
332
333 pub async fn unregister_service_notification_filter(
337 &self,
338 filter_id_handle: FilterIdHandle,
339 timeout: Duration,
340 cancellation_token: Option<BoxedCancelToken>,
341 ) -> crate::Result<()> {
342 self.unregister_service_notification_filter_internal(
343 filter_id_handle.id,
344 timeout.as_millis() as u32,
345 cancellation_token,
346 )
347 .await?
348 }
349
350 pub async fn create_service(
351 &self,
352 desc: &crate::types::ServiceDescription,
353 timeout: Duration,
354 cancellation_token: Option<BoxedCancelToken>,
355 ) -> crate::Result<()> {
356 {
357 let mut pool = BoxPool::new();
358 let ffi_raw = desc.get_raw_with_pool(&mut pool);
359 self.create_service_internal(&ffi_raw, timeout.as_millis() as u32, cancellation_token)
360 }
361 .await?
362 }
363
364 pub async fn update_service(
365 &self,
366 name: &Uri,
367 desc: &crate::types::ServiceUpdateDescription,
368 timeout: Duration,
369 cancellation_token: Option<BoxedCancelToken>,
370 ) -> crate::Result<()> {
371 {
372 let mut pool = BoxPool::new();
373 let ffi_raw = desc.get_raw_with_pool(&mut pool);
374 self.update_service_internal(
375 name.as_raw(),
376 &ffi_raw,
377 timeout.as_millis() as u32,
378 cancellation_token,
379 )
380 }
381 .await?
382 }
383
384 #[deprecated(note = "Use `delete_service2` instead, which supports force-delete.")]
385 pub async fn delete_service(
386 &self,
387 name: &Uri,
388 timeout: Duration,
389 cancellation_token: Option<BoxedCancelToken>,
390 ) -> crate::Result<()> {
391 self.delete_service_internal(
392 name.as_raw(),
393 timeout.as_millis() as u32,
394 cancellation_token,
395 )
396 .await?
397 }
398
399 pub async fn delete_service2(
401 &self,
402 desc: &crate::types::DeleteServiceDescription,
403 timeout: Duration,
404 cancellation_token: Option<BoxedCancelToken>,
405 ) -> crate::Result<()> {
406 {
407 let raw = desc.get_raw();
408 self.delete_service2_internal(&raw, timeout.as_millis() as u32, cancellation_token)
409 }
410 .await?
411 }
412}
413
414#[derive(Debug, PartialEq)]
417pub struct FilterIdHandle {
418 pub(crate) id: i64,
419}
420
421#[derive(Debug, PartialEq, Clone)]
423pub enum PartitionKeyType {
424 Int64(i64),
425 Invalid,
426 None,
427 String(WString),
428}
429
430impl PartitionKeyType {
431 unsafe fn from_raw_parts(svc: ServicePartitionKind, data: *const c_void) -> PartitionKeyType {
434 match svc {
435 ServicePartitionKind::Int64Range => {
436 let x = unsafe { (data as *mut i64).as_ref().unwrap() };
437 PartitionKeyType::Int64(*x)
438 }
439 ServicePartitionKind::Invalid => PartitionKeyType::Invalid,
440 ServicePartitionKind::Singleton => PartitionKeyType::None,
441 ServicePartitionKind::Named => {
442 let x = data as *mut u16;
443 assert!(!x.is_null());
444 let s = WString::from(PCWSTR::from_raw(x));
445 PartitionKeyType::String(s)
446 }
447 }
448 }
449}
450
451impl PartitionKeyType {
452 fn as_raw_parts(&self) -> (FABRIC_PARTITION_KEY_TYPE, *const c_void) {
454 match self {
455 PartitionKeyType::Int64(x) => (
457 FABRIC_PARTITION_KEY_TYPE_INT64,
458 x as *const i64 as *const c_void,
459 ),
460 PartitionKeyType::Invalid => (FABRIC_PARTITION_KEY_TYPE_INVALID, std::ptr::null()),
461 PartitionKeyType::None => (FABRIC_PARTITION_KEY_TYPE_NONE, std::ptr::null()),
462 PartitionKeyType::String(x) => (
463 FABRIC_PARTITION_KEY_TYPE_STRING,
464 x.as_pcwstr().as_ptr() as *const c_void,
465 ),
466 }
467 }
468}
469
470#[derive(Clone, Copy, Debug, PartialEq)]
471pub enum ServicePartitionKind {
472 Int64Range,
473 Invalid,
474 Named,
475 Singleton,
476}
477
478impl From<&ServicePartitionKind> for FABRIC_SERVICE_PARTITION_KIND {
479 fn from(value: &ServicePartitionKind) -> Self {
480 match value {
481 ServicePartitionKind::Int64Range => FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
482 ServicePartitionKind::Invalid => FABRIC_SERVICE_PARTITION_KIND_INVALID,
483 ServicePartitionKind::Named => FABRIC_SERVICE_PARTITION_KIND_NAMED,
484 ServicePartitionKind::Singleton => FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
485 }
486 }
487}
488
489impl From<FABRIC_SERVICE_PARTITION_KIND> for ServicePartitionKind {
490 fn from(value: FABRIC_SERVICE_PARTITION_KIND) -> Self {
491 match value {
492 FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE => ServicePartitionKind::Int64Range,
493 FABRIC_SERVICE_PARTITION_KIND_INVALID => ServicePartitionKind::Invalid,
494 FABRIC_SERVICE_PARTITION_KIND_NAMED => ServicePartitionKind::Named,
495 FABRIC_SERVICE_PARTITION_KIND_SINGLETON => ServicePartitionKind::Singleton,
496 _ => {
497 if cfg!(debug_assertions) {
498 panic!("unknown type: {value:?}");
499 } else {
500 ServicePartitionKind::Invalid
501 }
502 }
503 }
504 }
505}
506
507#[derive(Debug, Clone)]
508pub struct ResolvedServicePartition {
509 com: IFabricResolvedServicePartitionResult,
510 pub service_name: Uri,
511 pub service_partition_kind: ServicePartitionKind,
512 pub partition_key_type: PartitionKeyType,
513 pub endpoints: Vec<ResolvedServiceEndpoint>,
514}
515
516impl From<IFabricResolvedServicePartitionResult> for ResolvedServicePartition {
517 fn from(com: IFabricResolvedServicePartitionResult) -> Self {
518 let raw = unsafe { com.get_Partition().as_ref().unwrap() };
519 let service_name = Uri::from(raw.ServiceName);
520 let kind_raw = raw.Info.Kind;
521 let val = raw.Info.Value;
522 let service_partition_kind: ServicePartitionKind = kind_raw.into();
523 let partition_key_type =
524 unsafe { PartitionKeyType::from_raw_parts(service_partition_kind, val) };
525 let endpoints = crate::iter::vec_from_raw_com(raw.EndpointCount as usize, raw.Endpoints);
526 Self {
527 com,
528 service_name,
529 service_partition_kind,
530 partition_key_type,
531 endpoints,
532 }
533 }
534}
535
536impl ResolvedServicePartition {
537 pub fn compare_version(&self, other: &ResolvedServicePartition) -> crate::Result<i32> {
542 unsafe { self.com.CompareVersion(&other.com) }.map_err(crate::Error::from)
543 }
544}
545
546impl PartialEq for ResolvedServicePartition {
547 fn eq(&self, other: &Self) -> bool {
548 match self.compare_version(other) {
549 Ok(i) => i == 0,
550 Err(_) => false, }
552 }
553}
554
555impl PartialOrd for ResolvedServicePartition {
556 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
560 match self.compare_version(other) {
561 Ok(i) => Some(i.cmp(&0)),
562 Err(_) => None,
564 }
565 }
566}
567
568#[derive(Debug, PartialEq, Eq, Clone, Copy)]
569pub enum ServiceEndpointRole {
570 Invalid,
571 StatefulPrimary,
572 StatefulPrimaryAuxiliary,
573 StatefulSecondary,
574 StatefulAuxiliary,
575 Stateless,
576}
577
578impl From<FABRIC_SERVICE_ENDPOINT_ROLE> for ServiceEndpointRole {
579 fn from(value: FABRIC_SERVICE_ENDPOINT_ROLE) -> Self {
580 match value {
581 FABRIC_SERVICE_ROLE_INVALID => ServiceEndpointRole::Invalid,
582 FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY => ServiceEndpointRole::StatefulPrimary,
583 FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY => {
584 ServiceEndpointRole::StatefulPrimaryAuxiliary
585 }
586 FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY => ServiceEndpointRole::StatefulSecondary,
587 FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY => ServiceEndpointRole::StatefulAuxiliary,
588 FABRIC_SERVICE_ROLE_STATELESS => ServiceEndpointRole::Stateless,
589 _ => {
590 if cfg!(debug_assertions) {
591 panic!("unknown type: {value:?}");
592 } else {
593 ServiceEndpointRole::Invalid
594 }
595 }
596 }
597 }
598}
599
600#[derive(Debug, Clone, PartialEq)]
601pub struct ResolvedServiceEndpoint {
602 pub address: WString,
603 pub role: ServiceEndpointRole,
604}
605
606impl From<&FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEndpoint {
607 fn from(value: &FABRIC_RESOLVED_SERVICE_ENDPOINT) -> Self {
608 let raw = value;
609 Self {
610 address: WString::from(raw.Address),
611 role: raw.Role.into(),
612 }
613 }
614}
615
616#[cfg(test)]
617mod tests {
618 use crate::{PCWSTR, WString};
619
620 use super::{PartitionKeyType, ServicePartitionKind};
621
622 #[test]
623 fn test_conversion_int() {
624 let k = PartitionKeyType::Int64(99);
625 let (key_type, raw) = k.as_raw_parts();
627 assert_eq!(
628 key_type,
629 mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_INT64
630 );
631 let i = unsafe { (raw as *const i64).as_ref().unwrap() };
632 assert_eq!(*i, 99);
633
634 let service_type = ServicePartitionKind::Int64Range;
635 let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
637 assert_eq!(k, k2);
638 }
639
640 #[test]
641 fn test_conversion_string() {
642 let src = WString::from("mystr");
643 let k = PartitionKeyType::String(src.clone());
644 let (key_type, raw) = k.as_raw_parts();
646 assert_eq!(
647 key_type,
648 mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_STRING
649 );
650 let s = WString::from(PCWSTR::from_raw(raw as *const u16));
651 assert_eq!(s, src);
652
653 let service_type = ServicePartitionKind::Named;
654 let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
656 assert_eq!(k, k2);
657 }
658}