1use std::{ffi::c_void, time::Duration};
7
8use crate::{PCWSTR, WString, runtime::executor::BoxedCancelToken, types::Uri};
9use mssf_com::{
10 FabricClient::{IFabricResolvedServicePartitionResult, IFabricServiceManagementClient6},
11 FabricTypes::{
12 FABRIC_PARTITION_KEY_TYPE, FABRIC_PARTITION_KEY_TYPE_INT64,
13 FABRIC_PARTITION_KEY_TYPE_INVALID, FABRIC_PARTITION_KEY_TYPE_NONE,
14 FABRIC_PARTITION_KEY_TYPE_STRING, FABRIC_REMOVE_REPLICA_DESCRIPTION,
15 FABRIC_RESOLVED_SERVICE_ENDPOINT, FABRIC_RESTART_REPLICA_DESCRIPTION,
16 FABRIC_SERVICE_DESCRIPTION, FABRIC_SERVICE_ENDPOINT_ROLE,
17 FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_PARTITION_KIND,
18 FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID,
19 FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
20 FABRIC_SERVICE_ROLE_INVALID, FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY,
21 FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY,
22 FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, FABRIC_SERVICE_ROLE_STATELESS,
23 FABRIC_SERVICE_UPDATE_DESCRIPTION, FABRIC_URI,
24 },
25};
26
27use crate::sync::{FabricReceiver, fabric_begin_end_proxy};
28
29use crate::types::{
30 RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
31};
32
33#[derive(Debug, Clone)]
35pub struct ServiceManagementClient {
36 com: IFabricServiceManagementClient6,
37}
38
39impl ServiceManagementClient {
40 pub fn get_com(&self) -> IFabricServiceManagementClient6 {
41 self.com.clone()
42 }
43}
44impl ServiceManagementClient {
47 fn resolve_service_partition_internal(
48 &self,
49 name: FABRIC_URI,
50 partition_key_type: FABRIC_PARTITION_KEY_TYPE,
51 partition_key: Option<*const ::core::ffi::c_void>,
52 previous_result: Option<&IFabricResolvedServicePartitionResult>, timeout_milliseconds: u32,
54 cancellation_token: Option<BoxedCancelToken>,
55 ) -> FabricReceiver<crate::WinResult<IFabricResolvedServicePartitionResult>> {
56 let com1 = &self.com;
57 let com2 = self.com.clone();
58 fabric_begin_end_proxy(
59 move |callback| unsafe {
60 com1.BeginResolveServicePartition(
61 name,
62 partition_key_type,
63 partition_key.unwrap_or(std::ptr::null()),
64 previous_result,
65 timeout_milliseconds,
66 callback,
67 )
68 },
69 move |ctx| unsafe { com2.EndResolveServicePartition(ctx) },
70 cancellation_token,
71 )
72 }
73
74 fn restart_replica_internal(
75 &self,
76 desc: &FABRIC_RESTART_REPLICA_DESCRIPTION,
77 timeout_milliseconds: u32,
78 cancellation_token: Option<BoxedCancelToken>,
79 ) -> FabricReceiver<crate::WinResult<()>> {
80 let com1 = &self.com;
81 let com2 = self.com.clone();
82 fabric_begin_end_proxy(
83 move |callback| unsafe {
84 com1.BeginRestartReplica(desc, timeout_milliseconds, callback)
85 },
86 move |ctx| unsafe { com2.EndRestartReplica(ctx) },
87 cancellation_token,
88 )
89 }
90
91 fn remove_replica_internal(
92 &self,
93 desc: &FABRIC_REMOVE_REPLICA_DESCRIPTION,
94 timeout_milliseconds: u32,
95 cancellation_token: Option<BoxedCancelToken>,
96 ) -> FabricReceiver<crate::WinResult<()>> {
97 let com1 = &self.com;
98 let com2 = self.com.clone();
99 fabric_begin_end_proxy(
100 move |callback| unsafe {
101 com1.BeginRemoveReplica(desc, timeout_milliseconds, callback)
102 },
103 move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
104 cancellation_token,
105 )
106 }
107
108 fn register_service_notification_filter_internal(
109 &self,
110 desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
111 timeout_milliseconds: u32,
112 cancellation_token: Option<BoxedCancelToken>,
113 ) -> FabricReceiver<crate::WinResult<i64>> {
114 let com1 = &self.com;
115 let com2 = self.com.clone();
116 fabric_begin_end_proxy(
117 move |callback| unsafe {
118 com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
119 },
120 move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
121 cancellation_token,
122 )
123 }
124
125 fn unregister_service_notification_filter_internal(
126 &self,
127 filterid: i64,
128 timeout_milliseconds: u32,
129 cancellation_token: Option<BoxedCancelToken>,
130 ) -> FabricReceiver<crate::WinResult<()>> {
131 let com1 = &self.com;
132 let com2 = self.com.clone();
133 fabric_begin_end_proxy(
134 move |callback| unsafe {
135 com1.BeginUnregisterServiceNotificationFilter(
136 filterid,
137 timeout_milliseconds,
138 callback,
139 )
140 },
141 move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
142 cancellation_token,
143 )
144 }
145
146 fn create_service_internal(
147 &self,
148 desc: &FABRIC_SERVICE_DESCRIPTION,
149 timeout_milliseconds: u32,
150 cancellation_token: Option<BoxedCancelToken>,
151 ) -> FabricReceiver<crate::WinResult<()>> {
152 let com1 = &self.com;
153 let com2 = self.com.clone();
154 fabric_begin_end_proxy(
155 move |callback| unsafe {
156 com1.BeginCreateService(desc, timeout_milliseconds, callback)
157 },
158 move |ctx| unsafe { com2.EndCreateService(ctx) },
159 cancellation_token,
160 )
161 }
162
163 fn update_service_internal(
164 &self,
165 name: FABRIC_URI,
166 desc: &FABRIC_SERVICE_UPDATE_DESCRIPTION,
167 timeout_milliseconds: u32,
168 cancellation_token: Option<BoxedCancelToken>,
169 ) -> FabricReceiver<crate::WinResult<()>> {
170 let com1 = &self.com;
171 let com2 = self.com.clone();
172 fabric_begin_end_proxy(
173 move |callback| unsafe {
174 com1.BeginUpdateService(name, desc, timeout_milliseconds, callback)
175 },
176 move |ctx| unsafe { com2.EndUpdateService(ctx) },
177 cancellation_token,
178 )
179 }
180
181 fn delete_service_internal(
182 &self,
183 name: FABRIC_URI,
184 timeout_milliseconds: u32,
185 cancellation_token: Option<BoxedCancelToken>,
186 ) -> FabricReceiver<crate::WinResult<()>> {
187 let com1 = &self.com;
188 let com2 = self.com.clone();
189 fabric_begin_end_proxy(
190 move |callback| unsafe {
191 com1.BeginDeleteService(name, timeout_milliseconds, callback)
192 },
193 move |ctx| unsafe { com2.EndDeleteService(ctx) },
194 cancellation_token,
195 )
196 }
197}
198
199impl From<IFabricServiceManagementClient6> for ServiceManagementClient {
200 fn from(com: IFabricServiceManagementClient6) -> Self {
201 Self { com }
202 }
203}
204
205impl From<ServiceManagementClient> for IFabricServiceManagementClient6 {
206 fn from(value: ServiceManagementClient) -> Self {
207 value.com
208 }
209}
210
211impl ServiceManagementClient {
214 pub async fn resolve_service_partition(
216 &self,
217 name: &Uri,
218 key_type: &PartitionKeyType,
219 prev: Option<&ResolvedServicePartition>,
220 timeout: Duration,
221 cancellation_token: Option<BoxedCancelToken>,
222 ) -> crate::Result<ResolvedServicePartition> {
223 let com = {
224 let uri = name.as_raw();
225 let prev_opt = prev.map(|x| &x.com);
227
228 let part_key_opt = key_type.get_raw_opt();
229
230 self.resolve_service_partition_internal(
231 uri,
232 key_type.into(),
233 part_key_opt,
234 prev_opt,
235 timeout.as_millis().try_into().unwrap(),
236 cancellation_token,
237 )
238 }
239 .await??;
240 let res = ResolvedServicePartition::from(com);
241 Ok(res)
242 }
243
244 pub async fn restart_replica(
249 &self,
250 desc: &RestartReplicaDescription,
251 timeout: Duration,
252 cancellation_token: Option<BoxedCancelToken>,
253 ) -> crate::Result<()> {
254 {
255 let raw: FABRIC_RESTART_REPLICA_DESCRIPTION = desc.into();
256 self.restart_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
257 }
258 .await?
259 .map_err(crate::Error::from)
260 }
261
262 pub async fn remove_replica(
268 &self,
269 desc: &RemoveReplicaDescription,
270 timeout: Duration,
271 cancellation_token: Option<BoxedCancelToken>,
272 ) -> crate::Result<()> {
273 {
274 let raw: FABRIC_REMOVE_REPLICA_DESCRIPTION = desc.into();
275 self.remove_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
276 }
277 .await?
278 .map_err(crate::Error::from)
279 }
280
281 pub async fn register_service_notification_filter(
296 &self,
297 desc: &ServiceNotificationFilterDescription,
298 timeout: Duration,
299 cancellation_token: Option<BoxedCancelToken>,
300 ) -> crate::Result<FilterIdHandle> {
301 let id = {
302 let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
303 self.register_service_notification_filter_internal(
304 &raw,
305 timeout.as_millis() as u32,
306 cancellation_token,
307 )
308 }
309 .await??;
310 Ok(FilterIdHandle { id })
311 }
312
313 pub async fn unregister_service_notification_filter(
317 &self,
318 filter_id_handle: FilterIdHandle,
319 timeout: Duration,
320 cancellation_token: Option<BoxedCancelToken>,
321 ) -> crate::Result<()> {
322 self.unregister_service_notification_filter_internal(
323 filter_id_handle.id,
324 timeout.as_millis() as u32,
325 cancellation_token,
326 )
327 .await?
328 .map_err(crate::Error::from)
329 }
330
331 pub async fn create_service(
332 &self,
333 desc: &crate::types::ServiceDescription,
334 timeout: Duration,
335 cancellation_token: Option<BoxedCancelToken>,
336 ) -> crate::Result<()> {
337 {
338 let desc_raw = desc.build_raw();
339 let ffi_raw = desc_raw.as_ffi();
340 self.create_service_internal(&ffi_raw, timeout.as_millis() as u32, cancellation_token)
341 }
342 .await?
343 .map_err(crate::Error::from)
344 }
345
346 pub async fn update_service(
347 &self,
348 name: &Uri,
349 desc: &crate::types::ServiceUpdateDescription,
350 timeout: Duration,
351 cancellation_token: Option<BoxedCancelToken>,
352 ) -> crate::Result<()> {
353 {
354 let desc_raw = desc.build_raw();
355 let ffi_raw = desc_raw.as_ffi();
356 self.update_service_internal(
357 name.as_raw(),
358 &ffi_raw,
359 timeout.as_millis() as u32,
360 cancellation_token,
361 )
362 }
363 .await?
364 .map_err(crate::Error::from)
365 }
366
367 pub async fn delete_service(
368 &self,
369 name: &Uri,
370 timeout: Duration,
371 cancellation_token: Option<BoxedCancelToken>,
372 ) -> crate::Result<()> {
373 self.delete_service_internal(
374 name.as_raw(),
375 timeout.as_millis() as u32,
376 cancellation_token,
377 )
378 .await?
379 .map_err(crate::Error::from)
380 }
381}
382
383#[derive(Debug, PartialEq)]
386pub struct FilterIdHandle {
387 pub(crate) id: i64,
388}
389
390#[derive(Debug, PartialEq, Clone)]
392pub enum PartitionKeyType {
393 Int64(i64),
394 Invalid,
395 None,
396 String(WString),
397}
398
399impl PartitionKeyType {
400 fn from_raw_svc_part(svc: ServicePartitionKind, data: *const c_void) -> PartitionKeyType {
401 match svc {
402 ServicePartitionKind::Int64Range => {
403 let x = unsafe { (data as *mut i64).as_ref().unwrap() };
404 PartitionKeyType::Int64(*x)
405 }
406 ServicePartitionKind::Invalid => PartitionKeyType::Invalid,
407 ServicePartitionKind::Singleton => PartitionKeyType::None,
408 ServicePartitionKind::Named => {
409 let x = data as *mut u16;
410 assert!(!x.is_null());
411 let s = WString::from(PCWSTR::from_raw(x));
412 PartitionKeyType::String(s)
413 }
414 }
415 }
416}
417
418impl From<&PartitionKeyType> for FABRIC_PARTITION_KEY_TYPE {
419 fn from(value: &PartitionKeyType) -> Self {
420 match value {
421 PartitionKeyType::Int64(_) => FABRIC_PARTITION_KEY_TYPE_INT64,
422 PartitionKeyType::Invalid => FABRIC_PARTITION_KEY_TYPE_INVALID,
423 PartitionKeyType::None => FABRIC_PARTITION_KEY_TYPE_NONE,
424 PartitionKeyType::String(_) => FABRIC_PARTITION_KEY_TYPE_STRING,
425 }
426 }
427}
428
429impl PartitionKeyType {
430 fn get_raw_opt(&self) -> Option<*const c_void> {
432 match self {
433 PartitionKeyType::Int64(x) => Some(x as *const i64 as *const c_void),
435 PartitionKeyType::Invalid => None,
436 PartitionKeyType::None => None,
437 PartitionKeyType::String(x) => {
438 Some(PCWSTR::from_raw(x.as_ptr()).as_ptr() as *const c_void)
439 }
440 }
441 }
442}
443
444#[derive(Clone, Copy, Debug, PartialEq)]
445pub enum ServicePartitionKind {
446 Int64Range,
447 Invalid,
448 Named,
449 Singleton,
450}
451
452impl From<&ServicePartitionKind> for FABRIC_SERVICE_PARTITION_KIND {
453 fn from(value: &ServicePartitionKind) -> Self {
454 match value {
455 ServicePartitionKind::Int64Range => FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
456 ServicePartitionKind::Invalid => FABRIC_SERVICE_PARTITION_KIND_INVALID,
457 ServicePartitionKind::Named => FABRIC_SERVICE_PARTITION_KIND_NAMED,
458 ServicePartitionKind::Singleton => FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
459 }
460 }
461}
462
463impl From<FABRIC_SERVICE_PARTITION_KIND> for ServicePartitionKind {
464 fn from(value: FABRIC_SERVICE_PARTITION_KIND) -> Self {
465 match value {
466 FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE => ServicePartitionKind::Int64Range,
467 FABRIC_SERVICE_PARTITION_KIND_INVALID => ServicePartitionKind::Invalid,
468 FABRIC_SERVICE_PARTITION_KIND_NAMED => ServicePartitionKind::Named,
469 FABRIC_SERVICE_PARTITION_KIND_SINGLETON => ServicePartitionKind::Singleton,
470 _ => {
471 if cfg!(debug_assertions) {
472 panic!("unknown type: {value:?}");
473 } else {
474 ServicePartitionKind::Invalid
475 }
476 }
477 }
478 }
479}
480
481#[derive(Debug, Clone)]
482pub struct ResolvedServicePartition {
483 com: IFabricResolvedServicePartitionResult,
484 pub service_name: Uri,
485 pub service_partition_kind: ServicePartitionKind,
486 pub partition_key_type: PartitionKeyType,
487 pub endpoints: Vec<ResolvedServiceEndpoint>,
488}
489
490impl From<IFabricResolvedServicePartitionResult> for ResolvedServicePartition {
491 fn from(com: IFabricResolvedServicePartitionResult) -> Self {
492 let raw = unsafe { com.get_Partition().as_ref().unwrap() };
493 let service_name = Uri::from(raw.ServiceName);
494 let kind_raw = raw.Info.Kind;
495 let val = raw.Info.Value;
496 let service_partition_kind: ServicePartitionKind = kind_raw.into();
497 let partition_key_type = PartitionKeyType::from_raw_svc_part(service_partition_kind, val);
498 let endpoints = crate::iter::vec_from_raw_com(raw.EndpointCount as usize, raw.Endpoints);
499 Self {
500 com,
501 service_name,
502 service_partition_kind,
503 partition_key_type,
504 endpoints,
505 }
506 }
507}
508
509impl ResolvedServicePartition {
510 pub fn compare_version(&self, other: &ResolvedServicePartition) -> crate::Result<i32> {
515 unsafe { self.com.CompareVersion(&other.com) }.map_err(crate::Error::from)
516 }
517}
518
519impl PartialEq for ResolvedServicePartition {
520 fn eq(&self, other: &Self) -> bool {
521 match self.compare_version(other) {
522 Ok(i) => i == 0,
523 Err(_) => false, }
525 }
526}
527
528impl PartialOrd for ResolvedServicePartition {
529 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
533 match self.compare_version(other) {
534 Ok(i) => Some(i.cmp(&0)),
535 Err(_) => None,
537 }
538 }
539}
540
541#[derive(Debug, PartialEq, Eq, Clone, Copy)]
542pub enum ServiceEndpointRole {
543 Invalid,
544 StatefulPrimary,
545 StatefulPrimaryAuxiliary,
546 StatefulSecondary,
547 StatefulAuxiliary,
548 Stateless,
549}
550
551impl From<FABRIC_SERVICE_ENDPOINT_ROLE> for ServiceEndpointRole {
552 fn from(value: FABRIC_SERVICE_ENDPOINT_ROLE) -> Self {
553 match value {
554 FABRIC_SERVICE_ROLE_INVALID => ServiceEndpointRole::Invalid,
555 FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY => ServiceEndpointRole::StatefulPrimary,
556 FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY => {
557 ServiceEndpointRole::StatefulPrimaryAuxiliary
558 }
559 FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY => ServiceEndpointRole::StatefulSecondary,
560 FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY => ServiceEndpointRole::StatefulAuxiliary,
561 FABRIC_SERVICE_ROLE_STATELESS => ServiceEndpointRole::Stateless,
562 _ => {
563 if cfg!(debug_assertions) {
564 panic!("unknown type: {value:?}");
565 } else {
566 ServiceEndpointRole::Invalid
567 }
568 }
569 }
570 }
571}
572
573#[derive(Debug, Clone, PartialEq)]
574pub struct ResolvedServiceEndpoint {
575 pub address: WString,
576 pub role: ServiceEndpointRole,
577}
578
579impl From<&FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEndpoint {
580 fn from(value: &FABRIC_RESOLVED_SERVICE_ENDPOINT) -> Self {
581 let raw = value;
582 Self {
583 address: WString::from(raw.Address),
584 role: raw.Role.into(),
585 }
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use crate::{PCWSTR, WString};
592
593 use super::{PartitionKeyType, ServicePartitionKind};
594
595 #[test]
596 fn test_conversion_int() {
597 let k = PartitionKeyType::Int64(99);
598 let raw = k.get_raw_opt();
600 let i = unsafe { (raw.unwrap() as *const i64).as_ref().unwrap() };
601 assert_eq!(*i, 99);
602
603 let service_type = ServicePartitionKind::Int64Range;
604 let k2 = PartitionKeyType::from_raw_svc_part(service_type, raw.unwrap());
606 assert_eq!(k, k2);
607 }
608
609 #[test]
610 fn test_conversion_string() {
611 let src = WString::from("mystr");
612 let k = PartitionKeyType::String(src.clone());
613 let raw = k.get_raw_opt();
615 let s = WString::from(PCWSTR::from_raw(raw.unwrap() as *const u16));
616 assert_eq!(s, src);
617
618 let service_type = ServicePartitionKind::Named;
619 let k2 = PartitionKeyType::from_raw_svc_part(service_type, raw.unwrap());
621 assert_eq!(k, k2);
622 }
623}