1use std::{ffi::c_void, time::Duration};
7
8use crate::{
9 PCWSTR, WString,
10 mem::{BoxPool, GetRawWithBoxPool},
11 runtime::executor::BoxedCancelToken,
12 types::Uri,
13};
14use mssf_com::{
15 FabricClient::{IFabricResolvedServicePartitionResult, IFabricServiceManagementClient8},
16 FabricTypes::{
17 FABRIC_PARTITION_KEY_TYPE, FABRIC_PARTITION_KEY_TYPE_INT64,
18 FABRIC_PARTITION_KEY_TYPE_INVALID, FABRIC_PARTITION_KEY_TYPE_NONE,
19 FABRIC_PARTITION_KEY_TYPE_STRING, FABRIC_REMOVE_REPLICA_DESCRIPTION,
20 FABRIC_RESOLVED_SERVICE_ENDPOINT, FABRIC_RESTART_REPLICA_DESCRIPTION,
21 FABRIC_SERVICE_DESCRIPTION, FABRIC_SERVICE_ENDPOINT_ROLE,
22 FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_PARTITION_KIND,
23 FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID,
24 FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
25 FABRIC_SERVICE_ROLE_INVALID, FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY,
26 FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY,
27 FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, FABRIC_SERVICE_ROLE_STATELESS,
28 FABRIC_SERVICE_UPDATE_DESCRIPTION, FABRIC_URI,
29 },
30};
31
32use crate::sync::{FabricReceiver, fabric_begin_end_proxy};
33
34use crate::types::{
35 RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
36};
37
38#[derive(Debug, Clone)]
40pub struct ServiceManagementClient {
41 com: IFabricServiceManagementClient8,
42}
43
44impl ServiceManagementClient {
45 pub fn get_com(&self) -> IFabricServiceManagementClient8 {
46 self.com.clone()
47 }
48}
49impl ServiceManagementClient {
52 fn resolve_service_partition_internal(
53 &self,
54 name: FABRIC_URI,
55 partition_key_type: FABRIC_PARTITION_KEY_TYPE,
56 partition_key: *const ::core::ffi::c_void,
57 previous_result: Option<&IFabricResolvedServicePartitionResult>, timeout_milliseconds: u32,
59 cancellation_token: Option<BoxedCancelToken>,
60 ) -> FabricReceiver<crate::Result<IFabricResolvedServicePartitionResult>> {
61 let com1 = &self.com;
62 let com2 = self.com.clone();
63 fabric_begin_end_proxy(
64 move |callback| unsafe {
65 com1.BeginResolveServicePartition(
66 name,
67 partition_key_type,
68 partition_key,
69 previous_result,
70 timeout_milliseconds,
71 callback,
72 )
73 },
74 move |ctx| unsafe { com2.EndResolveServicePartition(ctx) },
75 cancellation_token,
76 )
77 }
78
79 fn restart_replica_internal(
80 &self,
81 desc: &FABRIC_RESTART_REPLICA_DESCRIPTION,
82 timeout_milliseconds: u32,
83 cancellation_token: Option<BoxedCancelToken>,
84 ) -> FabricReceiver<crate::Result<()>> {
85 let com1 = &self.com;
86 let com2 = self.com.clone();
87 fabric_begin_end_proxy(
88 move |callback| unsafe {
89 com1.BeginRestartReplica(desc, timeout_milliseconds, callback)
90 },
91 move |ctx| unsafe { com2.EndRestartReplica(ctx) },
92 cancellation_token,
93 )
94 }
95
96 fn remove_replica_internal(
97 &self,
98 desc: &FABRIC_REMOVE_REPLICA_DESCRIPTION,
99 timeout_milliseconds: u32,
100 cancellation_token: Option<BoxedCancelToken>,
101 ) -> FabricReceiver<crate::Result<()>> {
102 let com1 = &self.com;
103 let com2 = self.com.clone();
104 fabric_begin_end_proxy(
105 move |callback| unsafe {
106 com1.BeginRemoveReplica(desc, timeout_milliseconds, callback)
107 },
108 move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
109 cancellation_token,
110 )
111 }
112
113 fn register_service_notification_filter_internal(
114 &self,
115 desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
116 timeout_milliseconds: u32,
117 cancellation_token: Option<BoxedCancelToken>,
118 ) -> FabricReceiver<crate::Result<i64>> {
119 let com1 = &self.com;
120 let com2 = self.com.clone();
121 fabric_begin_end_proxy(
122 move |callback| unsafe {
123 com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
124 },
125 move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
126 cancellation_token,
127 )
128 }
129
130 fn unregister_service_notification_filter_internal(
131 &self,
132 filterid: i64,
133 timeout_milliseconds: u32,
134 cancellation_token: Option<BoxedCancelToken>,
135 ) -> FabricReceiver<crate::Result<()>> {
136 let com1 = &self.com;
137 let com2 = self.com.clone();
138 fabric_begin_end_proxy(
139 move |callback| unsafe {
140 com1.BeginUnregisterServiceNotificationFilter(
141 filterid,
142 timeout_milliseconds,
143 callback,
144 )
145 },
146 move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
147 cancellation_token,
148 )
149 }
150
151 fn create_service_internal(
152 &self,
153 desc: &FABRIC_SERVICE_DESCRIPTION,
154 timeout_milliseconds: u32,
155 cancellation_token: Option<BoxedCancelToken>,
156 ) -> FabricReceiver<crate::Result<()>> {
157 let com1 = &self.com;
158 let com2 = self.com.clone();
159 fabric_begin_end_proxy(
160 move |callback| unsafe {
161 com1.BeginCreateService(desc, timeout_milliseconds, callback)
162 },
163 move |ctx| unsafe { com2.EndCreateService(ctx) },
164 cancellation_token,
165 )
166 }
167
168 fn update_service_internal(
169 &self,
170 name: FABRIC_URI,
171 desc: &FABRIC_SERVICE_UPDATE_DESCRIPTION,
172 timeout_milliseconds: u32,
173 cancellation_token: Option<BoxedCancelToken>,
174 ) -> FabricReceiver<crate::Result<()>> {
175 let com1 = &self.com;
176 let com2 = self.com.clone();
177 fabric_begin_end_proxy(
178 move |callback| unsafe {
179 com1.BeginUpdateService(name, desc, timeout_milliseconds, callback)
180 },
181 move |ctx| unsafe { com2.EndUpdateService(ctx) },
182 cancellation_token,
183 )
184 }
185
186 fn delete_service_internal(
187 &self,
188 name: FABRIC_URI,
189 timeout_milliseconds: u32,
190 cancellation_token: Option<BoxedCancelToken>,
191 ) -> FabricReceiver<crate::Result<()>> {
192 let com1 = &self.com;
193 let com2 = self.com.clone();
194 fabric_begin_end_proxy(
195 move |callback| unsafe {
196 com1.BeginDeleteService(name, timeout_milliseconds, callback)
197 },
198 move |ctx| unsafe { com2.EndDeleteService(ctx) },
199 cancellation_token,
200 )
201 }
202}
203
204impl From<IFabricServiceManagementClient8> for ServiceManagementClient {
205 fn from(com: IFabricServiceManagementClient8) -> Self {
206 Self { com }
207 }
208}
209
210impl From<ServiceManagementClient> for IFabricServiceManagementClient8 {
211 fn from(value: ServiceManagementClient) -> Self {
212 value.com
213 }
214}
215
216impl ServiceManagementClient {
219 pub async fn resolve_service_partition(
221 &self,
222 name: &Uri,
223 key_type: &PartitionKeyType,
224 prev: Option<&ResolvedServicePartition>,
225 timeout: Duration,
226 cancellation_token: Option<BoxedCancelToken>,
227 ) -> crate::Result<ResolvedServicePartition> {
228 let com = {
229 let uri = name.as_raw();
230 let prev_opt = prev.map(|x| &x.com);
232
233 let (key_type, key) = key_type.as_raw_parts();
234
235 self.resolve_service_partition_internal(
236 uri,
237 key_type,
238 key,
239 prev_opt,
240 timeout.as_millis().try_into().unwrap(),
241 cancellation_token,
242 )
243 }
244 .await??;
245 let res = ResolvedServicePartition::from(com);
246 Ok(res)
247 }
248
249 pub async fn restart_replica(
254 &self,
255 desc: &RestartReplicaDescription,
256 timeout: Duration,
257 cancellation_token: Option<BoxedCancelToken>,
258 ) -> crate::Result<()> {
259 {
260 let raw: FABRIC_RESTART_REPLICA_DESCRIPTION = desc.into();
261 self.restart_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
262 }
263 .await?
264 }
265
266 pub async fn remove_replica(
272 &self,
273 desc: &RemoveReplicaDescription,
274 timeout: Duration,
275 cancellation_token: Option<BoxedCancelToken>,
276 ) -> crate::Result<()> {
277 {
278 let raw: FABRIC_REMOVE_REPLICA_DESCRIPTION = desc.into();
279 self.remove_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
280 }
281 .await?
282 }
283
284 pub async fn register_service_notification_filter(
299 &self,
300 desc: &ServiceNotificationFilterDescription,
301 timeout: Duration,
302 cancellation_token: Option<BoxedCancelToken>,
303 ) -> crate::Result<FilterIdHandle> {
304 let id = {
305 let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
306 self.register_service_notification_filter_internal(
307 &raw,
308 timeout.as_millis() as u32,
309 cancellation_token,
310 )
311 }
312 .await??;
313 Ok(FilterIdHandle { id })
314 }
315
316 pub async fn unregister_service_notification_filter(
320 &self,
321 filter_id_handle: FilterIdHandle,
322 timeout: Duration,
323 cancellation_token: Option<BoxedCancelToken>,
324 ) -> crate::Result<()> {
325 self.unregister_service_notification_filter_internal(
326 filter_id_handle.id,
327 timeout.as_millis() as u32,
328 cancellation_token,
329 )
330 .await?
331 }
332
333 pub async fn create_service(
334 &self,
335 desc: &crate::types::ServiceDescription,
336 timeout: Duration,
337 cancellation_token: Option<BoxedCancelToken>,
338 ) -> crate::Result<()> {
339 {
340 let mut pool = BoxPool::new();
341 let ffi_raw = desc.get_raw_with_pool(&mut pool);
342 self.create_service_internal(&ffi_raw, timeout.as_millis() as u32, cancellation_token)
343 }
344 .await?
345 }
346
347 pub async fn update_service(
348 &self,
349 name: &Uri,
350 desc: &crate::types::ServiceUpdateDescription,
351 timeout: Duration,
352 cancellation_token: Option<BoxedCancelToken>,
353 ) -> crate::Result<()> {
354 {
355 let mut pool = BoxPool::new();
356 let ffi_raw = desc.get_raw_with_pool(&mut pool);
357 self.update_service_internal(
358 name.as_raw(),
359 &ffi_raw,
360 timeout.as_millis() as u32,
361 cancellation_token,
362 )
363 }
364 .await?
365 }
366
367 pub async fn delete_service(
368 &self,
369 name: &Uri,
370 timeout: Duration,
371 cancellation_token: Option<BoxedCancelToken>,
372 ) -> crate::Result<()> {
373 self.delete_service_internal(
374 name.as_raw(),
375 timeout.as_millis() as u32,
376 cancellation_token,
377 )
378 .await?
379 }
380}
381
382#[derive(Debug, PartialEq)]
385pub struct FilterIdHandle {
386 pub(crate) id: i64,
387}
388
389#[derive(Debug, PartialEq, Clone)]
391pub enum PartitionKeyType {
392 Int64(i64),
393 Invalid,
394 None,
395 String(WString),
396}
397
398impl PartitionKeyType {
399 unsafe fn from_raw_parts(svc: ServicePartitionKind, data: *const c_void) -> PartitionKeyType {
402 match svc {
403 ServicePartitionKind::Int64Range => {
404 let x = unsafe { (data as *mut i64).as_ref().unwrap() };
405 PartitionKeyType::Int64(*x)
406 }
407 ServicePartitionKind::Invalid => PartitionKeyType::Invalid,
408 ServicePartitionKind::Singleton => PartitionKeyType::None,
409 ServicePartitionKind::Named => {
410 let x = data as *mut u16;
411 assert!(!x.is_null());
412 let s = WString::from(PCWSTR::from_raw(x));
413 PartitionKeyType::String(s)
414 }
415 }
416 }
417}
418
419impl PartitionKeyType {
420 fn as_raw_parts(&self) -> (FABRIC_PARTITION_KEY_TYPE, *const c_void) {
422 match self {
423 PartitionKeyType::Int64(x) => (
425 FABRIC_PARTITION_KEY_TYPE_INT64,
426 x as *const i64 as *const c_void,
427 ),
428 PartitionKeyType::Invalid => (FABRIC_PARTITION_KEY_TYPE_INVALID, std::ptr::null()),
429 PartitionKeyType::None => (FABRIC_PARTITION_KEY_TYPE_NONE, std::ptr::null()),
430 PartitionKeyType::String(x) => (
431 FABRIC_PARTITION_KEY_TYPE_STRING,
432 x.as_pcwstr().as_ptr() as *const c_void,
433 ),
434 }
435 }
436}
437
438#[derive(Clone, Copy, Debug, PartialEq)]
439pub enum ServicePartitionKind {
440 Int64Range,
441 Invalid,
442 Named,
443 Singleton,
444}
445
446impl From<&ServicePartitionKind> for FABRIC_SERVICE_PARTITION_KIND {
447 fn from(value: &ServicePartitionKind) -> Self {
448 match value {
449 ServicePartitionKind::Int64Range => FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
450 ServicePartitionKind::Invalid => FABRIC_SERVICE_PARTITION_KIND_INVALID,
451 ServicePartitionKind::Named => FABRIC_SERVICE_PARTITION_KIND_NAMED,
452 ServicePartitionKind::Singleton => FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
453 }
454 }
455}
456
457impl From<FABRIC_SERVICE_PARTITION_KIND> for ServicePartitionKind {
458 fn from(value: FABRIC_SERVICE_PARTITION_KIND) -> Self {
459 match value {
460 FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE => ServicePartitionKind::Int64Range,
461 FABRIC_SERVICE_PARTITION_KIND_INVALID => ServicePartitionKind::Invalid,
462 FABRIC_SERVICE_PARTITION_KIND_NAMED => ServicePartitionKind::Named,
463 FABRIC_SERVICE_PARTITION_KIND_SINGLETON => ServicePartitionKind::Singleton,
464 _ => {
465 if cfg!(debug_assertions) {
466 panic!("unknown type: {value:?}");
467 } else {
468 ServicePartitionKind::Invalid
469 }
470 }
471 }
472 }
473}
474
475#[derive(Debug, Clone)]
476pub struct ResolvedServicePartition {
477 com: IFabricResolvedServicePartitionResult,
478 pub service_name: Uri,
479 pub service_partition_kind: ServicePartitionKind,
480 pub partition_key_type: PartitionKeyType,
481 pub endpoints: Vec<ResolvedServiceEndpoint>,
482}
483
484impl From<IFabricResolvedServicePartitionResult> for ResolvedServicePartition {
485 fn from(com: IFabricResolvedServicePartitionResult) -> Self {
486 let raw = unsafe { com.get_Partition().as_ref().unwrap() };
487 let service_name = Uri::from(raw.ServiceName);
488 let kind_raw = raw.Info.Kind;
489 let val = raw.Info.Value;
490 let service_partition_kind: ServicePartitionKind = kind_raw.into();
491 let partition_key_type =
492 unsafe { PartitionKeyType::from_raw_parts(service_partition_kind, val) };
493 let endpoints = crate::iter::vec_from_raw_com(raw.EndpointCount as usize, raw.Endpoints);
494 Self {
495 com,
496 service_name,
497 service_partition_kind,
498 partition_key_type,
499 endpoints,
500 }
501 }
502}
503
504impl ResolvedServicePartition {
505 pub fn compare_version(&self, other: &ResolvedServicePartition) -> crate::Result<i32> {
510 unsafe { self.com.CompareVersion(&other.com) }.map_err(crate::Error::from)
511 }
512}
513
514impl PartialEq for ResolvedServicePartition {
515 fn eq(&self, other: &Self) -> bool {
516 match self.compare_version(other) {
517 Ok(i) => i == 0,
518 Err(_) => false, }
520 }
521}
522
523impl PartialOrd for ResolvedServicePartition {
524 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
528 match self.compare_version(other) {
529 Ok(i) => Some(i.cmp(&0)),
530 Err(_) => None,
532 }
533 }
534}
535
536#[derive(Debug, PartialEq, Eq, Clone, Copy)]
537pub enum ServiceEndpointRole {
538 Invalid,
539 StatefulPrimary,
540 StatefulPrimaryAuxiliary,
541 StatefulSecondary,
542 StatefulAuxiliary,
543 Stateless,
544}
545
546impl From<FABRIC_SERVICE_ENDPOINT_ROLE> for ServiceEndpointRole {
547 fn from(value: FABRIC_SERVICE_ENDPOINT_ROLE) -> Self {
548 match value {
549 FABRIC_SERVICE_ROLE_INVALID => ServiceEndpointRole::Invalid,
550 FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY => ServiceEndpointRole::StatefulPrimary,
551 FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY => {
552 ServiceEndpointRole::StatefulPrimaryAuxiliary
553 }
554 FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY => ServiceEndpointRole::StatefulSecondary,
555 FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY => ServiceEndpointRole::StatefulAuxiliary,
556 FABRIC_SERVICE_ROLE_STATELESS => ServiceEndpointRole::Stateless,
557 _ => {
558 if cfg!(debug_assertions) {
559 panic!("unknown type: {value:?}");
560 } else {
561 ServiceEndpointRole::Invalid
562 }
563 }
564 }
565 }
566}
567
568#[derive(Debug, Clone, PartialEq)]
569pub struct ResolvedServiceEndpoint {
570 pub address: WString,
571 pub role: ServiceEndpointRole,
572}
573
574impl From<&FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEndpoint {
575 fn from(value: &FABRIC_RESOLVED_SERVICE_ENDPOINT) -> Self {
576 let raw = value;
577 Self {
578 address: WString::from(raw.Address),
579 role: raw.Role.into(),
580 }
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use crate::{PCWSTR, WString};
587
588 use super::{PartitionKeyType, ServicePartitionKind};
589
590 #[test]
591 fn test_conversion_int() {
592 let k = PartitionKeyType::Int64(99);
593 let (key_type, raw) = k.as_raw_parts();
595 assert_eq!(
596 key_type,
597 mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_INT64
598 );
599 let i = unsafe { (raw as *const i64).as_ref().unwrap() };
600 assert_eq!(*i, 99);
601
602 let service_type = ServicePartitionKind::Int64Range;
603 let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
605 assert_eq!(k, k2);
606 }
607
608 #[test]
609 fn test_conversion_string() {
610 let src = WString::from("mystr");
611 let k = PartitionKeyType::String(src.clone());
612 let (key_type, raw) = k.as_raw_parts();
614 assert_eq!(
615 key_type,
616 mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_STRING
617 );
618 let s = WString::from(PCWSTR::from_raw(raw as *const u16));
619 assert_eq!(s, src);
620
621 let service_type = ServicePartitionKind::Named;
622 let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
624 assert_eq!(k, k2);
625 }
626}