1#![allow(non_camel_case_types)]
10
11use std::sync::Arc;
12
13use crate::{
14 Interface, runtime::stateful_proxy::StatefulServicePartition, strings::StringResult, types::Uri,
15};
16use windows_core::implement;
17
18use mssf_com::{
19 FabricCommon::IFabricStringResult,
20 FabricRuntime::{
21 IFabricPrimaryReplicator, IFabricPrimaryReplicator_Impl, IFabricReplicator,
22 IFabricReplicator_Impl, IFabricReplicatorCatchupSpecificQuorum,
23 IFabricReplicatorCatchupSpecificQuorum_Impl, IFabricStatefulServiceFactory,
24 IFabricStatefulServiceFactory_Impl, IFabricStatefulServicePartition,
25 IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
26 IFabricStatefulServiceReplica_Impl,
27 },
28 FabricTypes::{
29 FABRIC_EPOCH, FABRIC_REPLICA_INFORMATION, FABRIC_REPLICA_OPEN_MODE, FABRIC_REPLICA_ROLE,
30 FABRIC_REPLICA_SET_CONFIGURATION, FABRIC_REPLICA_SET_QUORUM_MODE, FABRIC_URI,
31 },
32};
33
34use crate::{
35 WString,
36 sync::BridgeContext,
37 types::{Epoch, OpenMode, ReplicaInformation, ReplicaRole, ReplicaSetConfig},
38};
39
40use super::{
41 IPrimaryReplicator, IReplicator, IStatefulServiceFactory, IStatefulServiceReplica,
42 executor::Executor,
43};
44#[implement(IFabricStatefulServiceFactory)]
49pub struct StatefulServiceFactoryBridge<E>
50where
51 E: Executor + 'static,
52{
53 inner: Box<dyn IStatefulServiceFactory>,
54 rt: E,
55}
56
57impl<E> StatefulServiceFactoryBridge<E>
58where
59 E: Executor,
60{
61 pub fn create(
62 factory: Box<dyn IStatefulServiceFactory>,
63 rt: E,
64 ) -> StatefulServiceFactoryBridge<E> {
65 StatefulServiceFactoryBridge { inner: factory, rt }
66 }
67}
68
69impl<E> IFabricStatefulServiceFactory_Impl for StatefulServiceFactoryBridge_Impl<E>
70where
71 E: Executor,
72{
73 #[allow(clippy::not_unsafe_ptr_arg_deref)]
74 #[cfg_attr(
75 feature = "tracing",
76 tracing::instrument(skip_all, ret(level = "debug"), err)
77 )]
78 fn CreateReplica(
79 &self,
80 servicetypename: &crate::PCWSTR,
81 servicename: FABRIC_URI,
82 initializationdatalength: u32,
83 initializationdata: *const u8,
84 partitionid: &crate::GUID,
85 replicaid: i64,
86 ) -> crate::WinResult<IFabricStatefulServiceReplica> {
87 let h_servicename = Uri::from(servicename);
88 let h_servicetypename = WString::from(*servicetypename);
89 let data = unsafe {
90 if !initializationdata.is_null() {
91 std::slice::from_raw_parts(initializationdata, initializationdatalength as usize)
92 } else {
93 &[]
94 }
95 };
96
97 let replica = self.inner.create_replica(
98 h_servicetypename,
99 h_servicename,
100 data,
101 *partitionid,
102 replicaid,
103 )?;
104 let rt = self.rt.clone();
105 let replica_bridge = IFabricStatefulServiceReplicaBridge::create(replica, rt);
106 Ok(replica_bridge.into())
107 }
108}
109
110#[implement(IFabricReplicator)]
116
117pub struct IFabricReplicatorBridge<E>
118where
119 E: Executor,
120{
121 inner: Arc<Box<dyn IReplicator>>,
122 rt: E,
123}
124
125impl<E> IFabricReplicatorBridge<E>
126where
127 E: Executor,
128{
129 pub fn create(rplctr: Box<dyn IReplicator>, rt: E) -> IFabricReplicatorBridge<E> {
130 IFabricReplicatorBridge {
131 inner: Arc::new(rplctr),
132 rt,
133 }
134 }
135
136 fn create_from_primary_replicator(
137 replicator: Arc<Box<dyn IReplicator>>,
138 rt: E,
139 ) -> IFabricReplicatorBridge<E> {
140 IFabricReplicatorBridge {
141 inner: replicator,
142 rt,
143 }
144 }
145}
146
147impl<E> IFabricReplicator_Impl for IFabricReplicatorBridge_Impl<E>
148where
149 E: Executor,
150{
151 #[cfg_attr(
152 feature = "tracing",
153 tracing::instrument(skip_all, ret(level = "debug"), err)
154 )]
155 fn BeginOpen(
156 &self,
157 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
158 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
159 let inner = self.inner.clone();
160 let (ctx, token) = BridgeContext::make(callback);
161 ctx.spawn(&self.rt, async move {
162 inner
163 .open(token)
164 .await
165 .map(|s| IFabricStringResult::from(StringResult::new(s)))
166 .map_err(crate::WinError::from)
167 })
168 }
169
170 #[cfg_attr(
171 feature = "tracing",
172 tracing::instrument(skip_all, ret(level = "debug"), err)
173 )]
174 fn EndOpen(
175 &self,
176 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
177 ) -> crate::WinResult<IFabricStringResult> {
178 BridgeContext::result(context)?
179 }
180
181 #[allow(clippy::not_unsafe_ptr_arg_deref)]
182 #[cfg_attr(
183 feature = "tracing",
184 tracing::instrument(skip_all, ret(level = "debug"), err)
185 )]
186 fn BeginChangeRole(
187 &self,
188 epoch: *const FABRIC_EPOCH,
189 role: FABRIC_REPLICA_ROLE,
190 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
191 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
192 let inner = self.inner.clone();
193 let epoch2: Epoch = unsafe { epoch.as_ref().unwrap().into() };
194 let role2: ReplicaRole = (&role).into();
195
196 let (ctx, token) = BridgeContext::make(callback);
197 ctx.spawn(&self.rt, async move {
198 inner
199 .change_role(epoch2, role2, token)
200 .await
201 .map_err(crate::WinError::from)
202 })
203 }
204 #[cfg_attr(
205 feature = "tracing",
206 tracing::instrument(skip_all, ret(level = "debug"), err)
207 )]
208 fn EndChangeRole(
209 &self,
210 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
211 ) -> crate::WinResult<()> {
212 BridgeContext::result(context)?
213 }
214
215 #[allow(clippy::not_unsafe_ptr_arg_deref)]
216 #[cfg_attr(
217 feature = "tracing",
218 tracing::instrument(skip_all, ret(level = "debug"), err)
219 )]
220 fn BeginUpdateEpoch(
221 &self,
222 epoch: *const FABRIC_EPOCH,
223 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
224 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
225 let inner = self.inner.clone();
226 let epoch2: Epoch = unsafe { epoch.as_ref().unwrap().into() };
227 let (ctx, token) = BridgeContext::make(callback);
228 ctx.spawn(&self.rt, async move {
229 inner
230 .update_epoch(epoch2, token)
231 .await
232 .map_err(crate::WinError::from)
233 })
234 }
235
236 #[cfg_attr(
237 feature = "tracing",
238 tracing::instrument(skip_all, ret(level = "debug"), err)
239 )]
240 fn EndUpdateEpoch(
241 &self,
242 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
243 ) -> crate::WinResult<()> {
244 BridgeContext::result(context)?
245 }
246
247 #[cfg_attr(
248 feature = "tracing",
249 tracing::instrument(skip_all, ret(level = "debug"), err)
250 )]
251 fn BeginClose(
252 &self,
253 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
254 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
255 let inner = self.inner.clone();
256 let (ctx, token) = BridgeContext::make(callback);
257 ctx.spawn(&self.rt, async move {
258 inner.close(token).await.map_err(crate::WinError::from)
259 })
260 }
261
262 #[cfg_attr(
263 feature = "tracing",
264 tracing::instrument(skip_all, ret(level = "debug"), err)
265 )]
266 fn EndClose(
267 &self,
268 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
269 ) -> crate::WinResult<()> {
270 BridgeContext::result(context)?
271 }
272
273 #[cfg_attr(
274 feature = "tracing",
275 tracing::instrument(skip_all, ret(level = "debug"))
276 )]
277 fn Abort(&self) {
278 self.inner.abort();
279 }
280
281 #[cfg_attr(
282 feature = "tracing",
283 tracing::instrument(skip_all, ret(level = "debug"), err)
284 )]
285 fn GetCurrentProgress(&self) -> crate::WinResult<i64> {
286 let lsn = self.inner.get_current_progress();
287 lsn.map_err(crate::WinError::from)
288 }
289
290 #[cfg_attr(
291 feature = "tracing",
292 tracing::instrument(skip_all, ret(level = "debug"), err)
293 )]
294 fn GetCatchUpCapability(&self) -> crate::WinResult<i64> {
295 let lsn = self.inner.get_catch_up_capability();
296 lsn.map_err(crate::WinError::from)
297 }
298}
299
300#[implement(IFabricPrimaryReplicator, IFabricReplicatorCatchupSpecificQuorum)]
311pub struct IFabricPrimaryReplicatorBridge<E>
312where
313 E: Executor,
314{
315 inner: Arc<Box<dyn IPrimaryReplicator>>,
316 rt: E,
317 rplctr: IFabricReplicator,
318}
319
320impl<E> IFabricPrimaryReplicatorBridge<E>
321where
322 E: Executor,
323{
324 pub fn create(rplctr: Box<dyn IPrimaryReplicator>, rt: E) -> IFabricPrimaryReplicatorBridge<E> {
325 let inner = Arc::new(rplctr);
326
327 let raw: *const Box<dyn IPrimaryReplicator> = Arc::into_raw(inner.clone());
329 let raw: *const Box<dyn IReplicator> = raw.cast();
330
331 let rpl_cast = unsafe { Arc::from_raw(raw) };
335
336 let replicator_bridge =
337 IFabricReplicatorBridge::create_from_primary_replicator(rpl_cast, rt.clone());
338
339 IFabricPrimaryReplicatorBridge {
340 inner,
341 rt,
342 rplctr: replicator_bridge.into(),
343 }
344 }
345}
346
347impl<E> IFabricReplicator_Impl for IFabricPrimaryReplicatorBridge_Impl<E>
349where
350 E: Executor,
351{
352 fn BeginOpen(
353 &self,
354 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
355 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
356 unsafe { self.rplctr.BeginOpen(callback.as_ref()) }
357 }
358
359 fn EndOpen(
360 &self,
361 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
362 ) -> crate::WinResult<IFabricStringResult> {
363 unsafe { self.rplctr.EndOpen(context.as_ref()) }
364 }
365
366 #[allow(clippy::not_unsafe_ptr_arg_deref)]
367 fn BeginChangeRole(
368 &self,
369 epoch: *const FABRIC_EPOCH,
370 role: FABRIC_REPLICA_ROLE,
371 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
372 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
373 unsafe { self.rplctr.BeginChangeRole(epoch, role, callback.as_ref()) }
374 }
375
376 fn EndChangeRole(
377 &self,
378 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
379 ) -> crate::WinResult<()> {
380 unsafe { self.rplctr.EndChangeRole(context.as_ref()) }
381 }
382
383 #[allow(clippy::not_unsafe_ptr_arg_deref)]
384 fn BeginUpdateEpoch(
385 &self,
386 epoch: *const FABRIC_EPOCH,
387 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
388 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
389 unsafe { self.rplctr.BeginUpdateEpoch(epoch, callback.as_ref()) }
390 }
391
392 fn EndUpdateEpoch(
393 &self,
394 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
395 ) -> crate::WinResult<()> {
396 unsafe { self.rplctr.EndUpdateEpoch(context.as_ref()) }
397 }
398
399 fn BeginClose(
400 &self,
401 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
402 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
403 unsafe { self.rplctr.BeginClose(callback.as_ref()) }
404 }
405
406 fn EndClose(
407 &self,
408 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
409 ) -> crate::WinResult<()> {
410 unsafe { self.rplctr.EndClose(context.as_ref()) }
411 }
412
413 fn Abort(&self) {
414 unsafe { self.rplctr.Abort() }
415 }
416
417 fn GetCurrentProgress(&self) -> crate::WinResult<i64> {
418 unsafe { self.rplctr.GetCurrentProgress() }
419 }
420
421 fn GetCatchUpCapability(&self) -> crate::WinResult<i64> {
422 unsafe { self.rplctr.GetCatchUpCapability() }
423 }
424}
425
426impl<E> IFabricPrimaryReplicator_Impl for IFabricPrimaryReplicatorBridge_Impl<E>
427where
428 E: Executor,
429{
430 #[cfg_attr(
431 feature = "tracing",
432 tracing::instrument(skip_all, ret(level = "debug"), err)
433 )]
434 fn BeginOnDataLoss(
435 &self,
436 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
437 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
438 let inner = self.inner.clone();
439
440 let (ctx, token) = BridgeContext::make(callback);
441 ctx.spawn(&self.rt, async move {
442 inner
443 .on_data_loss(token)
444 .await
445 .map_err(crate::WinError::from)
446 })
447 }
448
449 #[cfg_attr(
450 feature = "tracing",
451 tracing::instrument(skip_all, ret(level = "debug"), err)
452 )]
453 fn EndOnDataLoss(
454 &self,
455 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
456 ) -> crate::WinResult<u8> {
457 BridgeContext::result(context)?
458 }
459
460 #[allow(clippy::not_unsafe_ptr_arg_deref)]
461 #[cfg_attr(
462 feature = "tracing",
463 tracing::instrument(skip_all, ret(level = "debug"), err)
464 )]
465 fn UpdateCatchUpReplicaSetConfiguration(
466 &self,
467 currentconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
468 previousconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
469 ) -> crate::WinResult<()> {
470 let cc = ReplicaSetConfig::from(unsafe { currentconfiguration.as_ref().unwrap() });
471 let pc = ReplicaSetConfig::from(unsafe { previousconfiguration.as_ref().unwrap() });
472 self.inner
473 .update_catch_up_replica_set_configuration(cc, pc)
474 .map_err(crate::WinError::from)
475 }
476
477 #[cfg_attr(
478 feature = "tracing",
479 tracing::instrument(skip_all, ret(level = "debug"), err)
480 )]
481 fn BeginWaitForCatchUpQuorum(
482 &self,
483 catchupmode: FABRIC_REPLICA_SET_QUORUM_MODE,
484 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
485 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
486 let catchupmode = catchupmode.into();
487 let inner = self.inner.clone();
488 let (ctx, token) = BridgeContext::make(callback);
489 ctx.spawn(&self.rt, async move {
490 inner
491 .wait_for_catch_up_quorum(catchupmode, token)
492 .await
493 .map_err(crate::WinError::from)
494 })
495 }
496
497 #[cfg_attr(
498 feature = "tracing",
499 tracing::instrument(skip_all, ret(level = "debug"), err)
500 )]
501 fn EndWaitForCatchUpQuorum(
502 &self,
503 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
504 ) -> crate::WinResult<()> {
505 BridgeContext::result(context)?
506 }
507
508 #[allow(clippy::not_unsafe_ptr_arg_deref)]
509 #[cfg_attr(
510 feature = "tracing",
511 tracing::instrument(skip_all, ret(level = "debug"), err)
512 )]
513 fn UpdateCurrentReplicaSetConfiguration(
514 &self,
515 currentconfiguration: *const FABRIC_REPLICA_SET_CONFIGURATION,
516 ) -> crate::WinResult<()> {
517 let c = ReplicaSetConfig::from(unsafe { currentconfiguration.as_ref() }.unwrap());
518 self.inner
519 .update_current_replica_set_configuration(c)
520 .map_err(crate::WinError::from)
521 }
522
523 #[allow(clippy::not_unsafe_ptr_arg_deref)]
524 #[cfg_attr(
525 feature = "tracing",
526 tracing::instrument(skip_all, ret(level = "debug"), err)
527 )]
528 fn BeginBuildReplica(
529 &self,
530 replica: *const FABRIC_REPLICA_INFORMATION,
531 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
532 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
533 let inner = self.inner.clone();
534 let r = ReplicaInformation::from(unsafe { replica.as_ref().unwrap() });
535 debug_assert!(matches!(
537 r.role,
538 ReplicaRole::IdleSecondary | ReplicaRole::IdleAuxiliary
539 ));
540 debug_assert_eq!(r.catch_up_capability, -1);
541 debug_assert_eq!(r.current_progress, -1);
542
543 let (ctx, token) = BridgeContext::make(callback);
544 ctx.spawn(&self.rt, async move {
545 inner
546 .build_replica(r, token)
547 .await
548 .map_err(crate::WinError::from)
549 })
550 }
551
552 #[cfg_attr(
553 feature = "tracing",
554 tracing::instrument(skip_all, ret(level = "debug"), err)
555 )]
556 fn EndBuildReplica(
557 &self,
558 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
559 ) -> crate::WinResult<()> {
560 BridgeContext::result(context)?
561 }
562
563 #[cfg_attr(
564 feature = "tracing",
565 tracing::instrument(skip_all, ret(level = "debug"), err)
566 )]
567 fn RemoveReplica(&self, replicaid: i64) -> crate::WinResult<()> {
568 self.inner
569 .remove_replica(replicaid)
570 .map_err(crate::WinError::from)
571 }
572}
573
574impl<E> IFabricReplicatorCatchupSpecificQuorum_Impl for IFabricPrimaryReplicatorBridge_Impl<E> where
575 E: Executor
576{
577}
578#[implement(IFabricStatefulServiceReplica)]
585
586pub struct IFabricStatefulServiceReplicaBridge<E>
587where
588 E: Executor,
589{
590 inner: Arc<Box<dyn IStatefulServiceReplica>>,
591 rt: E,
592}
593
594impl<E> IFabricStatefulServiceReplicaBridge<E>
595where
596 E: Executor,
597{
598 pub fn create(
599 rplctr: Box<dyn IStatefulServiceReplica>,
600 rt: E,
601 ) -> IFabricStatefulServiceReplicaBridge<E> {
602 IFabricStatefulServiceReplicaBridge {
603 inner: Arc::new(rplctr),
604 rt,
605 }
606 }
607}
608
609impl<E> IFabricStatefulServiceReplica_Impl for IFabricStatefulServiceReplicaBridge_Impl<E>
610where
611 E: Executor,
612{
613 #[cfg_attr(
614 feature = "tracing",
615 tracing::instrument(skip_all, ret(level = "debug"), err)
616 )]
617 fn BeginOpen(
618 &self,
619 openmode: FABRIC_REPLICA_OPEN_MODE,
620 partition: windows_core::Ref<IFabricStatefulServicePartition>,
621 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
622 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
623 let inner = self.inner.clone();
624 let rt_cp = self.rt.clone();
625 let openmode2: OpenMode = openmode.into();
626 let com_partition = partition
627 .unwrap()
628 .cast::<IFabricStatefulServicePartition3>()
629 .expect("cannot query interface");
630 let partition = Arc::new(StatefulServicePartition::from(&com_partition));
631 let (ctx, token) = BridgeContext::make(callback);
632 ctx.spawn(&self.rt, async move {
633 inner
634 .open(openmode2, partition, token)
635 .await
636 .map(|s| {
637 let bridge: IFabricPrimaryReplicator =
638 IFabricPrimaryReplicatorBridge::create(s, rt_cp).into();
639 bridge.clone().cast::<IFabricReplicator>().unwrap()
640 })
641 .map_err(crate::WinError::from)
642 })
643 }
644
645 #[cfg_attr(
646 feature = "tracing",
647 tracing::instrument(skip_all, ret(level = "debug"), err)
648 )]
649 fn EndOpen(
650 &self,
651 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
652 ) -> crate::WinResult<IFabricReplicator> {
653 BridgeContext::result(context)?
654 }
655
656 #[cfg_attr(
657 feature = "tracing",
658 tracing::instrument(skip_all, ret(level = "debug"), err)
659 )]
660 fn BeginChangeRole(
661 &self,
662 newrole: FABRIC_REPLICA_ROLE,
663 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
664 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
665 let inner = self.inner.clone();
666 let newrole2: ReplicaRole = (&newrole).into();
667 let (ctx, token) = BridgeContext::make(callback);
668 ctx.spawn(&self.rt, async move {
669 inner
670 .change_role(newrole2, token)
671 .await
672 .map(|s| IFabricStringResult::from(StringResult::new(s)))
673 .map_err(crate::WinError::from)
674 })
675 }
676
677 #[cfg_attr(
678 feature = "tracing",
679 tracing::instrument(skip_all, ret(level = "debug"), err)
680 )]
681 fn EndChangeRole(
682 &self,
683 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
684 ) -> crate::WinResult<IFabricStringResult> {
685 BridgeContext::result(context)?
686 }
687
688 #[cfg_attr(
689 feature = "tracing",
690 tracing::instrument(skip_all, ret(level = "debug"), err)
691 )]
692 fn BeginClose(
693 &self,
694 callback: windows_core::Ref<super::IFabricAsyncOperationCallback>,
695 ) -> crate::WinResult<super::IFabricAsyncOperationContext> {
696 let inner = self.inner.clone();
697 let (ctx, token) = BridgeContext::make(callback);
698 ctx.spawn(&self.rt, async move {
699 inner.close(token).await.map_err(crate::WinError::from)
700 })
701 }
702
703 #[cfg_attr(
704 feature = "tracing",
705 tracing::instrument(skip_all, ret(level = "debug"), err)
706 )]
707 fn EndClose(
708 &self,
709 context: windows_core::Ref<super::IFabricAsyncOperationContext>,
710 ) -> crate::WinResult<()> {
711 BridgeContext::result(context)?
712 }
713
714 #[cfg_attr(
715 feature = "tracing",
716 tracing::instrument(skip_all, ret(level = "debug"))
717 )]
718 fn Abort(&self) {
719 self.inner.as_ref().abort();
720 }
721}
722
723