1#![allow(non_camel_case_types)]
10
11use std::sync::Arc;
12
13use crate::{Interface, runtime::stateful_proxy::StatefulServicePartition};
14use windows_core::implement;
15
16use mssf_com::{
17 FabricCommon::IFabricStringResult,
18 FabricRuntime::{
19 IFabricPrimaryReplicator, IFabricPrimaryReplicator_Impl, IFabricReplicator,
20 IFabricReplicator_Impl, IFabricReplicatorCatchupSpecificQuorum,
21 IFabricReplicatorCatchupSpecificQuorum_Impl, IFabricStatefulServiceFactory,
22 IFabricStatefulServiceFactory_Impl, IFabricStatefulServicePartition,
23 IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
24 IFabricStatefulServiceReplica_Impl,
25 },
26 FabricTypes::{
27 FABRIC_EPOCH, FABRIC_REPLICA_INFORMATION, FABRIC_REPLICA_OPEN_MODE, FABRIC_REPLICA_ROLE,
28 FABRIC_REPLICA_SET_CONFIGURATION, FABRIC_REPLICA_SET_QUORUM_MODE, FABRIC_URI,
29 },
30};
31
32use crate::{
33 strings::WStringWrap,
34 sync::BridgeContext,
35 types::{Epoch, OpenMode, ReplicaInformation, ReplicaRole, ReplicaSetConfig},
36};
37
38use super::{
39 executor::Executor,
40 stateful::{PrimaryReplicator, Replicator, StatefulServiceFactory, StatefulServiceReplica},
41};
42#[implement(IFabricStatefulServiceFactory)]
47pub struct StatefulServiceFactoryBridge<E, F>
48where
49 E: Executor + 'static,
50 F: StatefulServiceFactory + 'static,
51{
52 inner: F,
53 rt: E,
54}
55
56impl<E, F> StatefulServiceFactoryBridge<E, F>
57where
58 E: Executor,
59 F: StatefulServiceFactory,
60{
61 pub fn create(factory: F, rt: E) -> StatefulServiceFactoryBridge<E, F> {
62 StatefulServiceFactoryBridge::<E, F> { inner: factory, rt }
63 }
64}
65
66impl<E, F> IFabricStatefulServiceFactory_Impl for StatefulServiceFactoryBridge_Impl<E, F>
67where
68 E: Executor,
69 F: StatefulServiceFactory,
70{
71 #[allow(clippy::not_unsafe_ptr_arg_deref)]
72 #[cfg_attr(
73 feature = "tracing",
74 tracing::instrument(skip_all, ret(level = "debug"), err)
75 )]
76 fn CreateReplica(
77 &self,
78 servicetypename: &crate::PCWSTR,
79 servicename: FABRIC_URI,
80 initializationdatalength: u32,
81 initializationdata: *const u8,
82 partitionid: &crate::GUID,
83 replicaid: i64,
84 ) -> crate::WinResult<IFabricStatefulServiceReplica> {
85 let p_servicename = crate::PCWSTR::from_raw(servicename.0);
86 let h_servicename = WStringWrap::from(p_servicename).into();
87 let h_servicetypename = WStringWrap::from(*servicetypename).into();
88 let data = unsafe {
89 if !initializationdata.is_null() {
90 std::slice::from_raw_parts(initializationdata, initializationdatalength as usize)
91 } else {
92 &[]
93 }
94 };
95
96 let replica = self.inner.create_replica(
97 &h_servicetypename,
98 &h_servicename,
99 data,
100 partitionid,
101 replicaid,
102 )?;
103 let rt = self.rt.clone();
104 let replica_bridge = IFabricStatefulServiceReplicaBridge::create(replica, rt);
105 Ok(replica_bridge.into())
106 }
107}
108
109#[implement(IFabricReplicator)]
115
116pub struct IFabricReplicatorBridge<E, R>
117where
118 E: Executor,
119 R: Replicator,
120{
121 inner: Arc<R>,
122 rt: E,
123}
124
125impl<E, R> IFabricReplicatorBridge<E, R>
126where
127 E: Executor,
128 R: Replicator,
129{
130 pub fn create(rplctr: R, rt: E) -> IFabricReplicatorBridge<E, R> {
131 IFabricReplicatorBridge {
132 inner: Arc::new(rplctr),
133 rt,
134 }
135 }
136
137 fn create_from_primary_replicator(replicator: Arc<R>, rt: E) -> IFabricReplicatorBridge<E, R> {
138 IFabricReplicatorBridge {
139 inner: replicator,
140 rt,
141 }
142 }
143}
144
145impl<E, R> IFabricReplicator_Impl for IFabricReplicatorBridge_Impl<E, R>
146where
147 E: Executor,
148 R: Replicator,
149{
150 #[cfg_attr(
151 feature = "tracing",
152 tracing::instrument(skip_all, ret(level = "debug"), err)
153 )]
154 fn BeginOpen(
155 &self,
156 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
157 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
158 let inner = self.inner.clone();
159 let (ctx, token) = BridgeContext::make(callback);
160 ctx.spawn(&self.rt, async move {
161 inner
162 .open(token)
163 .await
164 .map(|s| IFabricStringResult::from(WStringWrap::from(s)))
165 .map_err(crate::WinError::from)
166 })
167 }
168
169 #[cfg_attr(
170 feature = "tracing",
171 tracing::instrument(skip_all, ret(level = "debug"), err)
172 )]
173 fn EndOpen(
174 &self,
175 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
176 ) -> crate::WinResult<IFabricStringResult> {
177 BridgeContext::result(context)?
178 }
179
180 #[allow(clippy::not_unsafe_ptr_arg_deref)]
181 #[cfg_attr(
182 feature = "tracing",
183 tracing::instrument(skip_all, ret(level = "debug"), err)
184 )]
185 fn BeginChangeRole(
186 &self,
187 epoch: *const FABRIC_EPOCH,
188 role: FABRIC_REPLICA_ROLE,
189 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
190 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
191 let inner = self.inner.clone();
192 let epoch2: Epoch = unsafe { epoch.as_ref().unwrap().into() };
193 let role2: ReplicaRole = (&role).into();
194
195 let (ctx, token) = BridgeContext::make(callback);
196 ctx.spawn(&self.rt, async move {
197 inner
198 .change_role(&epoch2, &role2, token)
199 .await
200 .map_err(crate::WinError::from)
201 })
202 }
203 #[cfg_attr(
204 feature = "tracing",
205 tracing::instrument(skip_all, ret(level = "debug"), err)
206 )]
207 fn EndChangeRole(
208 &self,
209 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
210 ) -> crate::WinResult<()> {
211 BridgeContext::result(context)?
212 }
213
214 #[allow(clippy::not_unsafe_ptr_arg_deref)]
215 #[cfg_attr(
216 feature = "tracing",
217 tracing::instrument(skip_all, ret(level = "debug"), err)
218 )]
219 fn BeginUpdateEpoch(
220 &self,
221 epoch: *const FABRIC_EPOCH,
222 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
223 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
224 let inner = self.inner.clone();
225 let epoch2: Epoch = unsafe { epoch.as_ref().unwrap().into() };
226 let (ctx, token) = BridgeContext::make(callback);
227 ctx.spawn(&self.rt, async move {
228 inner
229 .update_epoch(&epoch2, token)
230 .await
231 .map_err(crate::WinError::from)
232 })
233 }
234
235 #[cfg_attr(
236 feature = "tracing",
237 tracing::instrument(skip_all, ret(level = "debug"), err)
238 )]
239 fn EndUpdateEpoch(
240 &self,
241 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
242 ) -> crate::WinResult<()> {
243 BridgeContext::result(context)?
244 }
245
246 #[cfg_attr(
247 feature = "tracing",
248 tracing::instrument(skip_all, ret(level = "debug"), err)
249 )]
250 fn BeginClose(
251 &self,
252 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
253 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
254 let inner = self.inner.clone();
255 let (ctx, token) = BridgeContext::make(callback);
256 ctx.spawn(&self.rt, async move {
257 inner.close(token).await.map_err(crate::WinError::from)
258 })
259 }
260
261 #[cfg_attr(
262 feature = "tracing",
263 tracing::instrument(skip_all, ret(level = "debug"), err)
264 )]
265 fn EndClose(
266 &self,
267 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
268 ) -> crate::WinResult<()> {
269 BridgeContext::result(context)?
270 }
271
272 #[cfg_attr(
273 feature = "tracing",
274 tracing::instrument(skip_all, ret(level = "debug"))
275 )]
276 fn Abort(&self) {
277 self.inner.abort();
278 }
279
280 #[cfg_attr(
281 feature = "tracing",
282 tracing::instrument(skip_all, ret(level = "debug"), err)
283 )]
284 fn GetCurrentProgress(&self) -> crate::WinResult<i64> {
285 let lsn = self.inner.get_current_progress();
286 lsn.map_err(crate::WinError::from)
287 }
288
289 #[cfg_attr(
290 feature = "tracing",
291 tracing::instrument(skip_all, ret(level = "debug"), err)
292 )]
293 fn GetCatchUpCapability(&self) -> crate::WinResult<i64> {
294 let lsn = self.inner.get_catch_up_capability();
295 lsn.map_err(crate::WinError::from)
296 }
297}
298
299#[implement(IFabricPrimaryReplicator, IFabricReplicatorCatchupSpecificQuorum)]
310pub struct IFabricPrimaryReplicatorBridge<E, P>
311where
312 E: Executor,
313 P: PrimaryReplicator,
314{
315 inner: Arc<P>,
316 rt: E,
317 rplctr: IFabricReplicator,
318}
319
320impl<E, P> IFabricPrimaryReplicatorBridge<E, P>
321where
322 E: Executor,
323 P: PrimaryReplicator,
324{
325 pub fn create(rplctr: P, rt: E) -> IFabricPrimaryReplicatorBridge<E, P> {
326 let inner = Arc::new(rplctr);
327
328 let replicator_bridge =
338 IFabricReplicatorBridge::create_from_primary_replicator(inner.clone(), rt.clone());
339
340 IFabricPrimaryReplicatorBridge {
341 inner,
342 rt,
343 rplctr: replicator_bridge.into(),
344 }
345 }
346}
347
348impl<E, P> IFabricReplicator_Impl for IFabricPrimaryReplicatorBridge_Impl<E, P>
350where
351 E: Executor,
352 P: PrimaryReplicator,
353{
354 fn BeginOpen(
355 &self,
356 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
357 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
358 unsafe { self.rplctr.BeginOpen(callback.as_ref()) }
359 }
360
361 fn EndOpen(
362 &self,
363 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
364 ) -> crate::WinResult<IFabricStringResult> {
365 unsafe { self.rplctr.EndOpen(context.as_ref()) }
366 }
367
368 #[allow(clippy::not_unsafe_ptr_arg_deref)]
369 fn BeginChangeRole(
370 &self,
371 epoch: *const FABRIC_EPOCH,
372 role: FABRIC_REPLICA_ROLE,
373 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
374 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
375 unsafe { self.rplctr.BeginChangeRole(epoch, role, callback.as_ref()) }
376 }
377
378 fn EndChangeRole(
379 &self,
380 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
381 ) -> crate::WinResult<()> {
382 unsafe { self.rplctr.EndChangeRole(context.as_ref()) }
383 }
384
385 #[allow(clippy::not_unsafe_ptr_arg_deref)]
386 fn BeginUpdateEpoch(
387 &self,
388 epoch: *const FABRIC_EPOCH,
389 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
390 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
391 unsafe { self.rplctr.BeginUpdateEpoch(epoch, callback.as_ref()) }
392 }
393
394 fn EndUpdateEpoch(
395 &self,
396 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
397 ) -> crate::WinResult<()> {
398 unsafe { self.rplctr.EndUpdateEpoch(context.as_ref()) }
399 }
400
401 fn BeginClose(
402 &self,
403 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
404 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
405 unsafe { self.rplctr.BeginClose(callback.as_ref()) }
406 }
407
408 fn EndClose(
409 &self,
410 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
411 ) -> crate::WinResult<()> {
412 unsafe { self.rplctr.EndClose(context.as_ref()) }
413 }
414
415 fn Abort(&self) {
416 unsafe { self.rplctr.Abort() }
417 }
418
419 fn GetCurrentProgress(&self) -> crate::WinResult<i64> {
420 unsafe { self.rplctr.GetCurrentProgress() }
421 }
422
423 fn GetCatchUpCapability(&self) -> crate::WinResult<i64> {
424 unsafe { self.rplctr.GetCatchUpCapability() }
425 }
426}
427
428impl<E, P> IFabricPrimaryReplicator_Impl for IFabricPrimaryReplicatorBridge_Impl<E, P>
429where
430 E: Executor,
431 P: PrimaryReplicator,
432{
433 #[cfg_attr(
434 feature = "tracing",
435 tracing::instrument(skip_all, ret(level = "debug"), err)
436 )]
437 fn BeginOnDataLoss(
438 &self,
439 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
440 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
441 let inner = self.inner.clone();
442
443 let (ctx, token) = BridgeContext::make(callback);
444 ctx.spawn(&self.rt, async move {
445 inner
446 .on_data_loss(token)
447 .await
448 .map_err(crate::WinError::from)
449 })
450 }
451
452 #[cfg_attr(
453 feature = "tracing",
454 tracing::instrument(skip_all, ret(level = "debug"), err)
455 )]
456 fn EndOnDataLoss(
457 &self,
458 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
459 ) -> crate::WinResult<u8> {
460 BridgeContext::result(context)?
461 }
462
463 #[allow(clippy::not_unsafe_ptr_arg_deref)]
464 #[cfg_attr(
465 feature = "tracing",
466 tracing::instrument(skip_all, ret(level = "debug"), err)
467 )]
468 fn UpdateCatchUpReplicaSetConfiguration(
469 &self,
470 currentconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
471 previousconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
472 ) -> crate::WinResult<()> {
473 let cc = ReplicaSetConfig::from(unsafe { currentconfiguration.as_ref().unwrap() });
474 let pc = ReplicaSetConfig::from(unsafe { previousconfiguration.as_ref().unwrap() });
475 self.inner
476 .update_catch_up_replica_set_configuration(&cc, &pc)
477 .map_err(crate::WinError::from)
478 }
479
480 #[cfg_attr(
481 feature = "tracing",
482 tracing::instrument(skip_all, ret(level = "debug"), err)
483 )]
484 fn BeginWaitForCatchUpQuorum(
485 &self,
486 catchupmode: FABRIC_REPLICA_SET_QUORUM_MODE,
487 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
488 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
489 let catchupmode = catchupmode.into();
490 let inner = self.inner.clone();
491 let (ctx, token) = BridgeContext::make(callback);
492 ctx.spawn(&self.rt, async move {
493 inner
494 .wait_for_catch_up_quorum(catchupmode, token)
495 .await
496 .map_err(crate::WinError::from)
497 })
498 }
499
500 #[cfg_attr(
501 feature = "tracing",
502 tracing::instrument(skip_all, ret(level = "debug"), err)
503 )]
504 fn EndWaitForCatchUpQuorum(
505 &self,
506 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
507 ) -> crate::WinResult<()> {
508 BridgeContext::result(context)?
509 }
510
511 #[allow(clippy::not_unsafe_ptr_arg_deref)]
512 #[cfg_attr(
513 feature = "tracing",
514 tracing::instrument(skip_all, ret(level = "debug"), err)
515 )]
516 fn UpdateCurrentReplicaSetConfiguration(
517 &self,
518 currentconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
519 ) -> crate::WinResult<()> {
520 let c = ReplicaSetConfig::from(unsafe { currentconfiguration.as_ref() }.unwrap());
521 self.inner
522 .update_current_replica_set_configuration(&c)
523 .map_err(crate::WinError::from)
524 }
525
526 #[allow(clippy::not_unsafe_ptr_arg_deref)]
527 #[cfg_attr(
528 feature = "tracing",
529 tracing::instrument(skip_all, ret(level = "debug"), err)
530 )]
531 fn BeginBuildReplica(
532 &self,
533 replica: *const FABRIC_REPLICA_INFORMATION,
534 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
535 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
536 let inner = self.inner.clone();
537 let r = ReplicaInformation::from(unsafe { replica.as_ref().unwrap() });
538 debug_assert_eq!(r.role, ReplicaRole::IdleSecondary);
540 debug_assert_eq!(r.catch_up_capability, -1);
541 debug_assert_eq!(r.current_progress, -1);
542
543 let (ctx, token) = BridgeContext::make(callback);
544 ctx.spawn(&self.rt, async move {
545 inner
546 .build_replica(&r, token)
547 .await
548 .map_err(crate::WinError::from)
549 })
550 }
551
552 #[cfg_attr(
553 feature = "tracing",
554 tracing::instrument(skip_all, ret(level = "debug"), err)
555 )]
556 fn EndBuildReplica(
557 &self,
558 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
559 ) -> crate::WinResult<()> {
560 BridgeContext::result(context)?
561 }
562
563 #[cfg_attr(
564 feature = "tracing",
565 tracing::instrument(skip_all, ret(level = "debug"), err)
566 )]
567 fn RemoveReplica(&self, replicaid: i64) -> crate::WinResult<()> {
568 self.inner
569 .remove_replica(replicaid)
570 .map_err(crate::WinError::from)
571 }
572}
573
574impl<E, P> IFabricReplicatorCatchupSpecificQuorum_Impl for IFabricPrimaryReplicatorBridge_Impl<E, P>
575where
576 E: Executor,
577 P: PrimaryReplicator,
578{
579}
580#[implement(IFabricStatefulServiceReplica)]
587
588pub struct IFabricStatefulServiceReplicaBridge<E, R>
589where
590 E: Executor,
591 R: StatefulServiceReplica + 'static,
592{
593 inner: Arc<R>,
594 rt: E,
595}
596
597impl<E, R> IFabricStatefulServiceReplicaBridge<E, R>
598where
599 E: Executor,
600 R: StatefulServiceReplica,
601{
602 pub fn create(rplctr: R, rt: E) -> IFabricStatefulServiceReplicaBridge<E, R> {
603 IFabricStatefulServiceReplicaBridge {
604 inner: Arc::new(rplctr),
605 rt,
606 }
607 }
608}
609
610impl<E, R> IFabricStatefulServiceReplica_Impl for IFabricStatefulServiceReplicaBridge_Impl<E, R>
611where
612 E: Executor,
613 R: StatefulServiceReplica,
614{
615 #[cfg_attr(
616 feature = "tracing",
617 tracing::instrument(skip_all, ret(level = "debug"), err)
618 )]
619 fn BeginOpen(
620 &self,
621 openmode: FABRIC_REPLICA_OPEN_MODE,
622 partition: windows_core::Ref<IFabricStatefulServicePartition>,
623 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
624 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
625 let inner = self.inner.clone();
626 let rt_cp = self.rt.clone();
627 let openmode2: OpenMode = openmode.into();
628 let com_partition = partition
629 .unwrap()
630 .cast::<IFabricStatefulServicePartition3>()
631 .expect("cannot query interface");
632 let partition = StatefulServicePartition::from(&com_partition);
633 let (ctx, token) = BridgeContext::make(callback);
634 ctx.spawn(&self.rt, async move {
635 inner
636 .open(openmode2, &partition, token)
637 .await
638 .map(|s| {
639 let bridge: IFabricPrimaryReplicator =
640 IFabricPrimaryReplicatorBridge::create(s, rt_cp).into();
641 bridge.clone().cast::<IFabricReplicator>().unwrap()
642 })
643 .map_err(crate::WinError::from)
644 })
645 }
646
647 #[cfg_attr(
648 feature = "tracing",
649 tracing::instrument(skip_all, ret(level = "debug"), err)
650 )]
651 fn EndOpen(
652 &self,
653 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
654 ) -> crate::WinResult<IFabricReplicator> {
655 BridgeContext::result(context)?
656 }
657
658 #[cfg_attr(
659 feature = "tracing",
660 tracing::instrument(skip_all, ret(level = "debug"), err)
661 )]
662 fn BeginChangeRole(
663 &self,
664 newrole: FABRIC_REPLICA_ROLE,
665 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
666 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
667 let inner = self.inner.clone();
668 let newrole2: ReplicaRole = (&newrole).into();
669 let (ctx, token) = BridgeContext::make(callback);
670 ctx.spawn(&self.rt, async move {
671 inner
672 .change_role(newrole2, token)
673 .await
674 .map(|s| IFabricStringResult::from(WStringWrap::from(s)))
675 .map_err(crate::WinError::from)
676 })
677 }
678
679 #[cfg_attr(
680 feature = "tracing",
681 tracing::instrument(skip_all, ret(level = "debug"), err)
682 )]
683 fn EndChangeRole(
684 &self,
685 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
686 ) -> crate::WinResult<IFabricStringResult> {
687 BridgeContext::result(context)?
688 }
689
690 #[cfg_attr(
691 feature = "tracing",
692 tracing::instrument(skip_all, ret(level = "debug"), err)
693 )]
694 fn BeginClose(
695 &self,
696 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
697 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
698 let inner = self.inner.clone();
699 let (ctx, token) = BridgeContext::make(callback);
700 ctx.spawn(&self.rt, async move {
701 inner.close(token).await.map_err(crate::WinError::from)
702 })
703 }
704
705 #[cfg_attr(
706 feature = "tracing",
707 tracing::instrument(skip_all, ret(level = "debug"), err)
708 )]
709 fn EndClose(
710 &self,
711 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
712 ) -> crate::WinResult<()> {
713 BridgeContext::result(context)?
714 }
715
716 #[cfg_attr(
717 feature = "tracing",
718 tracing::instrument(skip_all, ret(level = "debug"))
719 )]
720 fn Abort(&self) {
721 self.inner.as_ref().abort();
722 }
723}
724
725