1use std::{ffi::c_void, sync::Arc};
10
11use crate::{Interface, WString, runtime::executor::BoxedCancelToken, strings::StringResult};
12use mssf_com::FabricRuntime::{
13 IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
14 IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
15};
16
17use crate::{
18 error::ErrorCode,
19 sync::fabric_begin_end_proxy,
20 types::{
21 FaultType, HealthInformation, LoadMetric, LoadMetricListRef, MoveCost, ReplicaRole,
22 ServicePartitionAccessStatus, ServicePartitionInformation,
23 },
24};
25
26use super::{IPrimaryReplicator, IReplicator, IStatefulServiceReplica};
27use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode};
28
29pub struct StatefulServiceReplicaProxy {
30 com_impl: IFabricStatefulServiceReplica,
31}
32
33impl StatefulServiceReplicaProxy {
34 pub fn new(com_impl: IFabricStatefulServiceReplica) -> StatefulServiceReplicaProxy {
35 StatefulServiceReplicaProxy { com_impl }
36 }
37}
38
39#[async_trait::async_trait]
40impl IStatefulServiceReplica for StatefulServiceReplicaProxy {
41 #[cfg_attr(
42 feature = "tracing",
43 tracing::instrument(skip_all, level = "debug", err) )]
45 async fn open(
46 &self,
47 openmode: OpenMode,
48 partition: Arc<dyn super::IStatefulServicePartition>,
49 cancellation_token: BoxedCancelToken,
50 ) -> crate::Result<Box<dyn IPrimaryReplicator>> {
51 let com1 = &self.com_impl;
52 let com2 = self.com_impl.clone();
53 let rx = fabric_begin_end_proxy(
54 move |callback| unsafe {
55 com1.BeginOpen(
56 openmode.into(),
57 partition.try_get_com().expect("must support com"),
58 callback,
59 )
60 },
61 move |ctx| unsafe { com2.EndOpen(ctx) },
62 Some(cancellation_token),
63 );
64 let rplctr = rx.await??;
65
66 let catchup_specific_quorum = rplctr
68 .cast::<IFabricReplicatorCatchupSpecificQuorum>()
69 .is_ok();
70 assert!(
71 catchup_specific_quorum,
72 "mssf does not support replicator without catchup_specific_quorum interface"
73 );
74
75 let p_rplctr: IFabricPrimaryReplicator = rplctr.clone().cast().unwrap(); let res = Box::new(PrimaryReplicatorProxy::new(p_rplctr));
80 Ok(res)
81 }
82
83 #[cfg_attr(
84 feature = "tracing",
85 tracing::instrument(skip_all, level = "debug", ret, err)
86 )]
87 async fn change_role(
88 &self,
89 newrole: ReplicaRole,
90 cancellation_token: BoxedCancelToken,
91 ) -> crate::Result<WString> {
92 let com1 = &self.com_impl;
94 let com2 = self.com_impl.clone();
95 let rx = fabric_begin_end_proxy(
96 move |callback| unsafe { com1.BeginChangeRole((&newrole).into(), callback) },
97 move |ctx| unsafe { com2.EndChangeRole(ctx) },
98 Some(cancellation_token),
99 );
100 let addr = rx.await??;
101 Ok(StringResult::from(&addr).into_inner())
102 }
103
104 #[cfg_attr(
105 feature = "tracing",
106 tracing::instrument(skip_all, level = "debug", ret, err)
107 )]
108 async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
109 let com1 = &self.com_impl;
110 let com2 = self.com_impl.clone();
111 let rx = fabric_begin_end_proxy(
112 move |callback| unsafe { com1.BeginClose(callback) },
113 move |ctx| unsafe { com2.EndClose(ctx) },
114 Some(cancellation_token),
115 );
116 rx.await?.map_err(crate::Error::from)
117 }
118 #[cfg_attr(
119 feature = "tracing",
120 tracing::instrument(skip_all, level = "debug", ret)
121 )]
122 fn abort(&self) {
123 unsafe { self.com_impl.Abort() }
124 }
125}
126
127pub struct ReplicatorProxy {
128 com_impl: IFabricReplicator,
129}
130
131impl ReplicatorProxy {
132 fn new(com_impl: IFabricReplicator) -> ReplicatorProxy {
133 ReplicatorProxy { com_impl }
134 }
135}
136
137#[async_trait::async_trait]
138impl IReplicator for ReplicatorProxy {
139 #[cfg_attr(
140 feature = "tracing",
141 tracing::instrument(skip_all, level = "debug", ret, err)
142 )]
143 async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
144 let com1 = &self.com_impl;
146 let com2 = self.com_impl.clone();
147 let rx = fabric_begin_end_proxy(
148 move |callback| unsafe { com1.BeginOpen(callback) },
149 move |ctx| unsafe { com2.EndOpen(ctx) },
150 Some(cancellation_token),
151 );
152 let addr = rx.await??;
153 Ok(StringResult::from(&addr).into_inner())
154 }
155 #[cfg_attr(
156 feature = "tracing",
157 tracing::instrument(skip_all, level = "debug", ret, err)
158 )]
159 async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
160 let com1 = &self.com_impl;
161 let com2 = self.com_impl.clone();
162 let rx = fabric_begin_end_proxy(
163 move |callback| unsafe { com1.BeginClose(callback) },
164 move |ctx| unsafe { com2.EndClose(ctx) },
165 Some(cancellation_token),
166 );
167 rx.await?.map_err(crate::Error::from)
168 }
169 #[cfg_attr(
170 feature = "tracing",
171 tracing::instrument(skip_all, level = "debug", ret, err)
172 )]
173 async fn change_role(
174 &self,
175 epoch: Epoch,
176 role: ReplicaRole,
177 cancellation_token: BoxedCancelToken,
178 ) -> crate::Result<()> {
179 let com1 = &self.com_impl;
180 let com2 = self.com_impl.clone();
181 let fabric_epoch: mssf_com::FabricTypes::FABRIC_EPOCH = (&epoch).into();
182 let rx = fabric_begin_end_proxy(
183 move |callback| unsafe {
184 com1.BeginChangeRole(&fabric_epoch, (&role).into(), callback)
185 },
186 move |ctx| unsafe { com2.EndChangeRole(ctx) },
187 Some(cancellation_token),
188 );
189 rx.await?.map_err(crate::Error::from)
190 }
191 #[cfg_attr(
192 feature = "tracing",
193 tracing::instrument(skip_all, level = "debug", ret, err)
194 )]
195 async fn update_epoch(
196 &self,
197 epoch: Epoch,
198 cancellation_token: BoxedCancelToken,
199 ) -> crate::Result<()> {
200 let com1 = &self.com_impl;
201 let com2 = self.com_impl.clone();
202 let fabric_epoch: mssf_com::FabricTypes::FABRIC_EPOCH = (&epoch).into();
203 let rx = fabric_begin_end_proxy(
204 move |callback| unsafe { com1.BeginUpdateEpoch(&fabric_epoch, callback) },
205 move |ctx| unsafe { com2.EndUpdateEpoch(ctx) },
206 Some(cancellation_token),
207 );
208 rx.await?.map_err(crate::Error::from)
209 }
210 #[cfg_attr(
211 feature = "tracing",
212 tracing::instrument(skip_all, level = "debug", ret, err)
213 )]
214 #[cfg_attr(
215 feature = "tracing",
216 tracing::instrument(skip_all, level = "debug", ret, err)
217 )]
218 fn get_current_progress(&self) -> crate::Result<i64> {
219 unsafe { self.com_impl.GetCurrentProgress() }.map_err(crate::Error::from)
220 }
221 #[cfg_attr(
222 feature = "tracing",
223 tracing::instrument(skip_all, level = "debug", ret, err)
224 )]
225 fn get_catch_up_capability(&self) -> crate::Result<i64> {
226 unsafe { self.com_impl.GetCatchUpCapability() }.map_err(crate::Error::from)
227 }
228 #[cfg_attr(
229 feature = "tracing",
230 tracing::instrument(skip_all, level = "debug", ret)
231 )]
232 fn abort(&self) {
233 unsafe { self.com_impl.Abort() }
234 }
235}
236
237pub struct PrimaryReplicatorProxy {
238 com_impl: IFabricPrimaryReplicator,
239 parent: ReplicatorProxy,
240}
241
242impl PrimaryReplicatorProxy {
243 pub fn new(com_impl: IFabricPrimaryReplicator) -> PrimaryReplicatorProxy {
244 let parent = ReplicatorProxy::new(com_impl.clone().cast().unwrap());
245 PrimaryReplicatorProxy { com_impl, parent }
246 }
247}
248
249#[async_trait::async_trait]
250impl IReplicator for PrimaryReplicatorProxy {
251 async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
252 self.parent.open(cancellation_token).await
253 }
254 async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
255 self.parent.close(cancellation_token).await
256 }
257 async fn change_role(
258 &self,
259 epoch: Epoch,
260 role: ReplicaRole,
261 cancellation_token: BoxedCancelToken,
262 ) -> crate::Result<()> {
263 self.parent
264 .change_role(epoch, role, cancellation_token)
265 .await
266 }
267 async fn update_epoch(
268 &self,
269 epoch: Epoch,
270 cancellation_token: BoxedCancelToken,
271 ) -> crate::Result<()> {
272 self.parent.update_epoch(epoch, cancellation_token).await
273 }
274 fn get_current_progress(&self) -> crate::Result<i64> {
275 self.parent.get_current_progress()
276 }
277 fn get_catch_up_capability(&self) -> crate::Result<i64> {
278 self.parent.get_catch_up_capability()
279 }
280 fn abort(&self) {
281 self.parent.abort()
282 }
283}
284
285#[async_trait::async_trait]
286impl IPrimaryReplicator for PrimaryReplicatorProxy {
287 #[cfg_attr(
288 feature = "tracing",
289 tracing::instrument(skip_all, level = "debug", ret, err)
290 )]
291 async fn on_data_loss(&self, cancellation_token: BoxedCancelToken) -> crate::Result<u8> {
292 let com1 = &self.com_impl;
293 let com2 = self.com_impl.clone();
294 let rx = fabric_begin_end_proxy(
295 move |callback| unsafe { com1.BeginOnDataLoss(callback) },
296 move |ctx| unsafe { com2.EndOnDataLoss(ctx) },
297 Some(cancellation_token),
298 );
299 rx.await?.map_err(crate::Error::from)
300 }
301 #[cfg_attr(
302 feature = "tracing",
303 tracing::instrument(skip_all, level = "debug", ret, err)
304 )]
305 fn update_catch_up_replica_set_configuration(
306 &self,
307 currentconfiguration: ReplicaSetConfig,
308 previousconfiguration: ReplicaSetConfig,
309 ) -> crate::Result<()> {
310 let cc_view = currentconfiguration.get_view();
311 let pc_view = previousconfiguration.get_view();
312 unsafe {
313 self.com_impl
314 .UpdateCatchUpReplicaSetConfiguration(cc_view.get_raw(), pc_view.get_raw())
315 }
316 .map_err(crate::Error::from)
317 }
318 #[cfg_attr(
319 feature = "tracing",
320 tracing::instrument(skip_all, level = "debug", ret, err)
321 )]
322 async fn wait_for_catch_up_quorum(
323 &self,
324 catchupmode: ReplicaSetQuorumMode,
325 cancellation_token: BoxedCancelToken,
326 ) -> crate::Result<()> {
327 let com1 = &self.com_impl;
328 let com2 = self.com_impl.clone();
329 let rx = fabric_begin_end_proxy(
330 move |callback| unsafe { com1.BeginWaitForCatchUpQuorum(catchupmode.into(), callback) },
331 move |ctx| unsafe { com2.EndWaitForCatchUpQuorum(ctx) },
332 Some(cancellation_token),
333 );
334 rx.await?.map_err(crate::Error::from)
335 }
336 #[cfg_attr(
337 feature = "tracing",
338 tracing::instrument(skip_all, level = "debug", ret, err)
339 )]
340 fn update_current_replica_set_configuration(
341 &self,
342 currentconfiguration: ReplicaSetConfig,
343 ) -> crate::Result<()> {
344 unsafe {
345 self.com_impl
346 .UpdateCurrentReplicaSetConfiguration(currentconfiguration.get_view().get_raw())
347 }
348 .map_err(crate::Error::from)
349 }
350 #[cfg_attr(
351 feature = "tracing",
352 tracing::instrument(skip_all, level = "debug", ret, err)
353 )]
354 async fn build_replica(
355 &self,
356 replica: ReplicaInformation,
357 cancellation_token: BoxedCancelToken,
358 ) -> crate::Result<()> {
359 let com1 = &self.com_impl;
360 let com2 = self.com_impl.clone();
361 let rx = fabric_begin_end_proxy(
362 move |callback| {
363 let (mut info, ex1) = replica.get_raw_parts();
364 info.Reserved = std::ptr::addr_of!(ex1) as *mut c_void;
365 unsafe { com1.BeginBuildReplica(&info, callback) }
366 },
367 move |ctx| unsafe { com2.EndBuildReplica(ctx) },
368 Some(cancellation_token),
369 );
370 rx.await?.map_err(crate::Error::from)
371 }
372 #[cfg_attr(
373 feature = "tracing",
374 tracing::instrument(skip_all, level = "debug", ret, err)
375 )]
376 fn remove_replica(&self, replicaid: i64) -> crate::Result<()> {
377 unsafe { self.com_impl.RemoveReplica(replicaid) }.map_err(crate::Error::from)
378 }
379}
380
381#[derive(Debug, Clone)]
383pub struct StatefulServicePartition {
384 com_impl: IFabricStatefulServicePartition3,
385}
386
387impl super::IStatefulServicePartition for StatefulServicePartition {
388 fn get_partition_information(&self) -> crate::Result<ServicePartitionInformation> {
389 unsafe { self.com_impl.GetPartitionInfo()?.as_ref() }
390 .ok_or(ErrorCode::E_POINTER.into())
391 .map(ServicePartitionInformation::from)
392 }
393
394 fn get_read_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
395 unsafe { self.com_impl.GetReadStatus() }
396 .map(ServicePartitionAccessStatus::from)
397 .map_err(crate::Error::from)
398 }
399
400 fn get_write_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
401 unsafe { self.com_impl.GetWriteStatus() }
402 .map(ServicePartitionAccessStatus::from)
403 .map_err(crate::Error::from)
404 }
405
406 fn create_replicator(&self) -> crate::Result<Box<dyn IPrimaryReplicator>> {
408 Err(ErrorCode::E_NOTIMPL.into())
409 }
410
411 fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
412 let metrics_ref = LoadMetricListRef::from_slice(metrics);
413 let raw = metrics_ref.as_raw_slice();
414 unsafe { self.com_impl.ReportLoad(raw) }.map_err(crate::Error::from)
415 }
416
417 fn report_fault(&self, fault_type: FaultType) -> crate::Result<()> {
418 unsafe { self.com_impl.ReportFault(fault_type.into()) }.map_err(crate::Error::from)
419 }
420
421 fn report_move_cost(&self, move_cost: MoveCost) -> crate::Result<()> {
422 unsafe { self.com_impl.ReportMoveCost(move_cost.into()) }.map_err(crate::Error::from)
423 }
424
425 fn report_partition_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
426 let healthinfo_ref = &healthinfo.into();
427 unsafe { self.com_impl.ReportPartitionHealth(healthinfo_ref) }.map_err(crate::Error::from)
428 }
429
430 fn report_replica_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
431 let healthinfo_ref = &healthinfo.into();
432 unsafe { self.com_impl.ReportReplicaHealth(healthinfo_ref) }.map_err(crate::Error::from)
433 }
434
435 fn try_get_com(
436 &self,
437 ) -> crate::Result<&mssf_com::FabricRuntime::IFabricStatefulServicePartition> {
438 Ok(&self.com_impl)
439 }
440}
441
442impl From<&IFabricStatefulServicePartition3> for StatefulServicePartition {
443 fn from(e: &IFabricStatefulServicePartition3) -> Self {
444 StatefulServicePartition {
445 com_impl: e.clone(),
446 }
447 }
448}