1#![allow(non_camel_case_types)]
10
11use std::sync::Arc;
12
13use crate::{
14 Interface, runtime::stateful_proxy::StatefulServicePartition, strings::StringResult, types::Uri,
15};
16use windows_core::implement;
17
18use mssf_com::{
19 FabricCommon::IFabricStringResult,
20 FabricRuntime::{
21 IFabricPrimaryReplicator, IFabricPrimaryReplicator_Impl, IFabricReplicator,
22 IFabricReplicator_Impl, IFabricReplicatorCatchupSpecificQuorum,
23 IFabricReplicatorCatchupSpecificQuorum_Impl, IFabricStatefulServiceFactory,
24 IFabricStatefulServiceFactory_Impl, IFabricStatefulServicePartition,
25 IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
26 IFabricStatefulServiceReplica_Impl,
27 },
28 FabricTypes::{
29 FABRIC_EPOCH, FABRIC_REPLICA_INFORMATION, FABRIC_REPLICA_OPEN_MODE, FABRIC_REPLICA_ROLE,
30 FABRIC_REPLICA_SET_CONFIGURATION, FABRIC_REPLICA_SET_QUORUM_MODE, FABRIC_URI,
31 },
32};
33
34use crate::{
35 WString,
36 sync::BridgeContext,
37 types::{Epoch, OpenMode, ReplicaInformation, ReplicaRole, ReplicaSetConfig},
38};
39
40use super::{
41 executor::Executor,
42 stateful::{PrimaryReplicator, Replicator, StatefulServiceFactory, StatefulServiceReplica},
43};
44#[implement(IFabricStatefulServiceFactory)]
49pub struct StatefulServiceFactoryBridge<E, F>
50where
51 E: Executor + 'static,
52 F: StatefulServiceFactory + 'static,
53{
54 inner: F,
55 rt: E,
56}
57
58impl<E, F> StatefulServiceFactoryBridge<E, F>
59where
60 E: Executor,
61 F: StatefulServiceFactory,
62{
63 pub fn create(factory: F, rt: E) -> StatefulServiceFactoryBridge<E, F> {
64 StatefulServiceFactoryBridge::<E, F> { inner: factory, rt }
65 }
66}
67
68impl<E, F> IFabricStatefulServiceFactory_Impl for StatefulServiceFactoryBridge_Impl<E, F>
69where
70 E: Executor,
71 F: StatefulServiceFactory,
72{
73 #[allow(clippy::not_unsafe_ptr_arg_deref)]
74 #[cfg_attr(
75 feature = "tracing",
76 tracing::instrument(skip_all, ret(level = "debug"), err)
77 )]
78 fn CreateReplica(
79 &self,
80 servicetypename: &crate::PCWSTR,
81 servicename: FABRIC_URI,
82 initializationdatalength: u32,
83 initializationdata: *const u8,
84 partitionid: &crate::GUID,
85 replicaid: i64,
86 ) -> crate::WinResult<IFabricStatefulServiceReplica> {
87 let h_servicename = Uri::from(servicename);
88 let h_servicetypename = WString::from(*servicetypename);
89 let data = unsafe {
90 if !initializationdata.is_null() {
91 std::slice::from_raw_parts(initializationdata, initializationdatalength as usize)
92 } else {
93 &[]
94 }
95 };
96
97 let replica = self.inner.create_replica(
98 h_servicetypename,
99 h_servicename,
100 data,
101 *partitionid,
102 replicaid,
103 )?;
104 let rt = self.rt.clone();
105 let replica_bridge = IFabricStatefulServiceReplicaBridge::create(replica, rt);
106 Ok(replica_bridge.into())
107 }
108}
109
110#[implement(IFabricReplicator)]
116
117pub struct IFabricReplicatorBridge<E, R>
118where
119 E: Executor,
120 R: Replicator,
121{
122 inner: Arc<R>,
123 rt: E,
124}
125
126impl<E, R> IFabricReplicatorBridge<E, R>
127where
128 E: Executor,
129 R: Replicator,
130{
131 pub fn create(rplctr: R, rt: E) -> IFabricReplicatorBridge<E, R> {
132 IFabricReplicatorBridge {
133 inner: Arc::new(rplctr),
134 rt,
135 }
136 }
137
138 fn create_from_primary_replicator(replicator: Arc<R>, rt: E) -> IFabricReplicatorBridge<E, R> {
139 IFabricReplicatorBridge {
140 inner: replicator,
141 rt,
142 }
143 }
144}
145
146impl<E, R> IFabricReplicator_Impl for IFabricReplicatorBridge_Impl<E, R>
147where
148 E: Executor,
149 R: Replicator,
150{
151 #[cfg_attr(
152 feature = "tracing",
153 tracing::instrument(skip_all, ret(level = "debug"), err)
154 )]
155 fn BeginOpen(
156 &self,
157 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
158 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
159 let inner = self.inner.clone();
160 let (ctx, token) = BridgeContext::make(callback);
161 ctx.spawn(&self.rt, async move {
162 inner
163 .open(token)
164 .await
165 .map(|s| IFabricStringResult::from(StringResult::new(s)))
166 .map_err(crate::WinError::from)
167 })
168 }
169
170 #[cfg_attr(
171 feature = "tracing",
172 tracing::instrument(skip_all, ret(level = "debug"), err)
173 )]
174 fn EndOpen(
175 &self,
176 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
177 ) -> crate::WinResult<IFabricStringResult> {
178 BridgeContext::result(context)?
179 }
180
181 #[allow(clippy::not_unsafe_ptr_arg_deref)]
182 #[cfg_attr(
183 feature = "tracing",
184 tracing::instrument(skip_all, ret(level = "debug"), err)
185 )]
186 fn BeginChangeRole(
187 &self,
188 epoch: *const FABRIC_EPOCH,
189 role: FABRIC_REPLICA_ROLE,
190 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
191 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
192 let inner = self.inner.clone();
193 let epoch2: Epoch = unsafe { epoch.as_ref().unwrap().into() };
194 let role2: ReplicaRole = (&role).into();
195
196 let (ctx, token) = BridgeContext::make(callback);
197 ctx.spawn(&self.rt, async move {
198 inner
199 .change_role(epoch2, role2, token)
200 .await
201 .map_err(crate::WinError::from)
202 })
203 }
204 #[cfg_attr(
205 feature = "tracing",
206 tracing::instrument(skip_all, ret(level = "debug"), err)
207 )]
208 fn EndChangeRole(
209 &self,
210 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
211 ) -> crate::WinResult<()> {
212 BridgeContext::result(context)?
213 }
214
215 #[allow(clippy::not_unsafe_ptr_arg_deref)]
216 #[cfg_attr(
217 feature = "tracing",
218 tracing::instrument(skip_all, ret(level = "debug"), err)
219 )]
220 fn BeginUpdateEpoch(
221 &self,
222 epoch: *const FABRIC_EPOCH,
223 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
224 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
225 let inner = self.inner.clone();
226 let epoch2: Epoch = unsafe { epoch.as_ref().unwrap().into() };
227 let (ctx, token) = BridgeContext::make(callback);
228 ctx.spawn(&self.rt, async move {
229 inner
230 .update_epoch(epoch2, token)
231 .await
232 .map_err(crate::WinError::from)
233 })
234 }
235
236 #[cfg_attr(
237 feature = "tracing",
238 tracing::instrument(skip_all, ret(level = "debug"), err)
239 )]
240 fn EndUpdateEpoch(
241 &self,
242 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
243 ) -> crate::WinResult<()> {
244 BridgeContext::result(context)?
245 }
246
247 #[cfg_attr(
248 feature = "tracing",
249 tracing::instrument(skip_all, ret(level = "debug"), err)
250 )]
251 fn BeginClose(
252 &self,
253 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
254 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
255 let inner = self.inner.clone();
256 let (ctx, token) = BridgeContext::make(callback);
257 ctx.spawn(&self.rt, async move {
258 inner.close(token).await.map_err(crate::WinError::from)
259 })
260 }
261
262 #[cfg_attr(
263 feature = "tracing",
264 tracing::instrument(skip_all, ret(level = "debug"), err)
265 )]
266 fn EndClose(
267 &self,
268 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
269 ) -> crate::WinResult<()> {
270 BridgeContext::result(context)?
271 }
272
273 #[cfg_attr(
274 feature = "tracing",
275 tracing::instrument(skip_all, ret(level = "debug"))
276 )]
277 fn Abort(&self) {
278 self.inner.abort();
279 }
280
281 #[cfg_attr(
282 feature = "tracing",
283 tracing::instrument(skip_all, ret(level = "debug"), err)
284 )]
285 fn GetCurrentProgress(&self) -> crate::WinResult<i64> {
286 let lsn = self.inner.get_current_progress();
287 lsn.map_err(crate::WinError::from)
288 }
289
290 #[cfg_attr(
291 feature = "tracing",
292 tracing::instrument(skip_all, ret(level = "debug"), err)
293 )]
294 fn GetCatchUpCapability(&self) -> crate::WinResult<i64> {
295 let lsn = self.inner.get_catch_up_capability();
296 lsn.map_err(crate::WinError::from)
297 }
298}
299
300#[implement(IFabricPrimaryReplicator, IFabricReplicatorCatchupSpecificQuorum)]
311pub struct IFabricPrimaryReplicatorBridge<E, P>
312where
313 E: Executor,
314 P: PrimaryReplicator,
315{
316 inner: Arc<P>,
317 rt: E,
318 rplctr: IFabricReplicator,
319}
320
321impl<E, P> IFabricPrimaryReplicatorBridge<E, P>
322where
323 E: Executor,
324 P: PrimaryReplicator,
325{
326 pub fn create(rplctr: P, rt: E) -> IFabricPrimaryReplicatorBridge<E, P> {
327 let inner = Arc::new(rplctr);
328
329 let replicator_bridge =
339 IFabricReplicatorBridge::create_from_primary_replicator(inner.clone(), rt.clone());
340
341 IFabricPrimaryReplicatorBridge {
342 inner,
343 rt,
344 rplctr: replicator_bridge.into(),
345 }
346 }
347}
348
349impl<E, P> IFabricReplicator_Impl for IFabricPrimaryReplicatorBridge_Impl<E, P>
351where
352 E: Executor,
353 P: PrimaryReplicator,
354{
355 fn BeginOpen(
356 &self,
357 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
358 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
359 unsafe { self.rplctr.BeginOpen(callback.as_ref()) }
360 }
361
362 fn EndOpen(
363 &self,
364 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
365 ) -> crate::WinResult<IFabricStringResult> {
366 unsafe { self.rplctr.EndOpen(context.as_ref()) }
367 }
368
369 #[allow(clippy::not_unsafe_ptr_arg_deref)]
370 fn BeginChangeRole(
371 &self,
372 epoch: *const FABRIC_EPOCH,
373 role: FABRIC_REPLICA_ROLE,
374 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
375 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
376 unsafe { self.rplctr.BeginChangeRole(epoch, role, callback.as_ref()) }
377 }
378
379 fn EndChangeRole(
380 &self,
381 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
382 ) -> crate::WinResult<()> {
383 unsafe { self.rplctr.EndChangeRole(context.as_ref()) }
384 }
385
386 #[allow(clippy::not_unsafe_ptr_arg_deref)]
387 fn BeginUpdateEpoch(
388 &self,
389 epoch: *const FABRIC_EPOCH,
390 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
391 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
392 unsafe { self.rplctr.BeginUpdateEpoch(epoch, callback.as_ref()) }
393 }
394
395 fn EndUpdateEpoch(
396 &self,
397 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
398 ) -> crate::WinResult<()> {
399 unsafe { self.rplctr.EndUpdateEpoch(context.as_ref()) }
400 }
401
402 fn BeginClose(
403 &self,
404 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
405 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
406 unsafe { self.rplctr.BeginClose(callback.as_ref()) }
407 }
408
409 fn EndClose(
410 &self,
411 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
412 ) -> crate::WinResult<()> {
413 unsafe { self.rplctr.EndClose(context.as_ref()) }
414 }
415
416 fn Abort(&self) {
417 unsafe { self.rplctr.Abort() }
418 }
419
420 fn GetCurrentProgress(&self) -> crate::WinResult<i64> {
421 unsafe { self.rplctr.GetCurrentProgress() }
422 }
423
424 fn GetCatchUpCapability(&self) -> crate::WinResult<i64> {
425 unsafe { self.rplctr.GetCatchUpCapability() }
426 }
427}
428
429impl<E, P> IFabricPrimaryReplicator_Impl for IFabricPrimaryReplicatorBridge_Impl<E, P>
430where
431 E: Executor,
432 P: PrimaryReplicator,
433{
434 #[cfg_attr(
435 feature = "tracing",
436 tracing::instrument(skip_all, ret(level = "debug"), err)
437 )]
438 fn BeginOnDataLoss(
439 &self,
440 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
441 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
442 let inner = self.inner.clone();
443
444 let (ctx, token) = BridgeContext::make(callback);
445 ctx.spawn(&self.rt, async move {
446 inner
447 .on_data_loss(token)
448 .await
449 .map_err(crate::WinError::from)
450 })
451 }
452
453 #[cfg_attr(
454 feature = "tracing",
455 tracing::instrument(skip_all, ret(level = "debug"), err)
456 )]
457 fn EndOnDataLoss(
458 &self,
459 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
460 ) -> crate::WinResult<u8> {
461 BridgeContext::result(context)?
462 }
463
464 #[allow(clippy::not_unsafe_ptr_arg_deref)]
465 #[cfg_attr(
466 feature = "tracing",
467 tracing::instrument(skip_all, ret(level = "debug"), err)
468 )]
469 fn UpdateCatchUpReplicaSetConfiguration(
470 &self,
471 currentconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
472 previousconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
473 ) -> crate::WinResult<()> {
474 let cc = ReplicaSetConfig::from(unsafe { currentconfiguration.as_ref().unwrap() });
475 let pc = ReplicaSetConfig::from(unsafe { previousconfiguration.as_ref().unwrap() });
476 self.inner
477 .update_catch_up_replica_set_configuration(cc, pc)
478 .map_err(crate::WinError::from)
479 }
480
481 #[cfg_attr(
482 feature = "tracing",
483 tracing::instrument(skip_all, ret(level = "debug"), err)
484 )]
485 fn BeginWaitForCatchUpQuorum(
486 &self,
487 catchupmode: FABRIC_REPLICA_SET_QUORUM_MODE,
488 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
489 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
490 let catchupmode = catchupmode.into();
491 let inner = self.inner.clone();
492 let (ctx, token) = BridgeContext::make(callback);
493 ctx.spawn(&self.rt, async move {
494 inner
495 .wait_for_catch_up_quorum(catchupmode, token)
496 .await
497 .map_err(crate::WinError::from)
498 })
499 }
500
501 #[cfg_attr(
502 feature = "tracing",
503 tracing::instrument(skip_all, ret(level = "debug"), err)
504 )]
505 fn EndWaitForCatchUpQuorum(
506 &self,
507 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
508 ) -> crate::WinResult<()> {
509 BridgeContext::result(context)?
510 }
511
512 #[allow(clippy::not_unsafe_ptr_arg_deref)]
513 #[cfg_attr(
514 feature = "tracing",
515 tracing::instrument(skip_all, ret(level = "debug"), err)
516 )]
517 fn UpdateCurrentReplicaSetConfiguration(
518 &self,
519 currentconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
520 ) -> crate::WinResult<()> {
521 let c = ReplicaSetConfig::from(unsafe { currentconfiguration.as_ref() }.unwrap());
522 self.inner
523 .update_current_replica_set_configuration(c)
524 .map_err(crate::WinError::from)
525 }
526
527 #[allow(clippy::not_unsafe_ptr_arg_deref)]
528 #[cfg_attr(
529 feature = "tracing",
530 tracing::instrument(skip_all, ret(level = "debug"), err)
531 )]
532 fn BeginBuildReplica(
533 &self,
534 replica: *const FABRIC_REPLICA_INFORMATION,
535 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
536 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
537 let inner = self.inner.clone();
538 let r = ReplicaInformation::from(unsafe { replica.as_ref().unwrap() });
539 debug_assert_eq!(r.role, ReplicaRole::IdleSecondary);
541 debug_assert_eq!(r.catch_up_capability, -1);
542 debug_assert_eq!(r.current_progress, -1);
543
544 let (ctx, token) = BridgeContext::make(callback);
545 ctx.spawn(&self.rt, async move {
546 inner
547 .build_replica(r, token)
548 .await
549 .map_err(crate::WinError::from)
550 })
551 }
552
553 #[cfg_attr(
554 feature = "tracing",
555 tracing::instrument(skip_all, ret(level = "debug"), err)
556 )]
557 fn EndBuildReplica(
558 &self,
559 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
560 ) -> crate::WinResult<()> {
561 BridgeContext::result(context)?
562 }
563
564 #[cfg_attr(
565 feature = "tracing",
566 tracing::instrument(skip_all, ret(level = "debug"), err)
567 )]
568 fn RemoveReplica(&self, replicaid: i64) -> crate::WinResult<()> {
569 self.inner
570 .remove_replica(replicaid)
571 .map_err(crate::WinError::from)
572 }
573}
574
575impl<E, P> IFabricReplicatorCatchupSpecificQuorum_Impl for IFabricPrimaryReplicatorBridge_Impl<E, P>
576where
577 E: Executor,
578 P: PrimaryReplicator,
579{
580}
581#[implement(IFabricStatefulServiceReplica)]
588
589pub struct IFabricStatefulServiceReplicaBridge<E, R>
590where
591 E: Executor,
592 R: StatefulServiceReplica + 'static,
593{
594 inner: Arc<R>,
595 rt: E,
596}
597
598impl<E, R> IFabricStatefulServiceReplicaBridge<E, R>
599where
600 E: Executor,
601 R: StatefulServiceReplica,
602{
603 pub fn create(rplctr: R, rt: E) -> IFabricStatefulServiceReplicaBridge<E, R> {
604 IFabricStatefulServiceReplicaBridge {
605 inner: Arc::new(rplctr),
606 rt,
607 }
608 }
609}
610
611impl<E, R> IFabricStatefulServiceReplica_Impl for IFabricStatefulServiceReplicaBridge_Impl<E, R>
612where
613 E: Executor,
614 R: StatefulServiceReplica,
615{
616 #[cfg_attr(
617 feature = "tracing",
618 tracing::instrument(skip_all, ret(level = "debug"), err)
619 )]
620 fn BeginOpen(
621 &self,
622 openmode: FABRIC_REPLICA_OPEN_MODE,
623 partition: windows_core::Ref<IFabricStatefulServicePartition>,
624 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
625 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
626 let inner = self.inner.clone();
627 let rt_cp = self.rt.clone();
628 let openmode2: OpenMode = openmode.into();
629 let com_partition = partition
630 .unwrap()
631 .cast::<IFabricStatefulServicePartition3>()
632 .expect("cannot query interface");
633 let partition = StatefulServicePartition::from(&com_partition);
634 let (ctx, token) = BridgeContext::make(callback);
635 ctx.spawn(&self.rt, async move {
636 inner
637 .open(openmode2, partition, token)
638 .await
639 .map(|s| {
640 let bridge: IFabricPrimaryReplicator =
641 IFabricPrimaryReplicatorBridge::create(s, rt_cp).into();
642 bridge.clone().cast::<IFabricReplicator>().unwrap()
643 })
644 .map_err(crate::WinError::from)
645 })
646 }
647
648 #[cfg_attr(
649 feature = "tracing",
650 tracing::instrument(skip_all, ret(level = "debug"), err)
651 )]
652 fn EndOpen(
653 &self,
654 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
655 ) -> crate::WinResult<IFabricReplicator> {
656 BridgeContext::result(context)?
657 }
658
659 #[cfg_attr(
660 feature = "tracing",
661 tracing::instrument(skip_all, ret(level = "debug"), err)
662 )]
663 fn BeginChangeRole(
664 &self,
665 newrole: FABRIC_REPLICA_ROLE,
666 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
667 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
668 let inner = self.inner.clone();
669 let newrole2: ReplicaRole = (&newrole).into();
670 let (ctx, token) = BridgeContext::make(callback);
671 ctx.spawn(&self.rt, async move {
672 inner
673 .change_role(newrole2, token)
674 .await
675 .map(|s| IFabricStringResult::from(StringResult::new(s)))
676 .map_err(crate::WinError::from)
677 })
678 }
679
680 #[cfg_attr(
681 feature = "tracing",
682 tracing::instrument(skip_all, ret(level = "debug"), err)
683 )]
684 fn EndChangeRole(
685 &self,
686 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
687 ) -> crate::WinResult<IFabricStringResult> {
688 BridgeContext::result(context)?
689 }
690
691 #[cfg_attr(
692 feature = "tracing",
693 tracing::instrument(skip_all, ret(level = "debug"), err)
694 )]
695 fn BeginClose(
696 &self,
697 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
698 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
699 let inner = self.inner.clone();
700 let (ctx, token) = BridgeContext::make(callback);
701 ctx.spawn(&self.rt, async move {
702 inner.close(token).await.map_err(crate::WinError::from)
703 })
704 }
705
706 #[cfg_attr(
707 feature = "tracing",
708 tracing::instrument(skip_all, ret(level = "debug"), err)
709 )]
710 fn EndClose(
711 &self,
712 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
713 ) -> crate::WinResult<()> {
714 BridgeContext::result(context)?
715 }
716
717 #[cfg_attr(
718 feature = "tracing",
719 tracing::instrument(skip_all, ret(level = "debug"))
720 )]
721 fn Abort(&self) {
722 self.inner.as_ref().abort();
723 }
724}
725
726