1use std::ffi::c_void;
10
11use crate::{Interface, WString, runtime::executor::BoxedCancelToken, strings::StringResult};
12use mssf_com::FabricRuntime::{
13 IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
14 IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
15};
16
17use crate::{
18 error::ErrorCode,
19 sync::fabric_begin_end_proxy,
20 types::{
21 FaultType, HealthInformation, LoadMetric, LoadMetricListRef, MoveCost, ReplicaRole,
22 ServicePartitionAccessStatus, ServicePartitionInformation,
23 },
24};
25
26use super::stateful::{PrimaryReplicator, Replicator, StatefulServiceReplica};
27use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode};
28
29pub struct StatefulServiceReplicaProxy {
30 com_impl: IFabricStatefulServiceReplica,
31}
32
33impl StatefulServiceReplicaProxy {
34 pub fn new(com_impl: IFabricStatefulServiceReplica) -> StatefulServiceReplicaProxy {
35 StatefulServiceReplicaProxy { com_impl }
36 }
37}
38
39impl StatefulServiceReplica for StatefulServiceReplicaProxy {
40 #[cfg_attr(
41 feature = "tracing",
42 tracing::instrument(skip_all, level = "debug", err) )]
44 async fn open(
45 &self,
46 openmode: OpenMode,
47 partition: StatefulServicePartition,
48 cancellation_token: BoxedCancelToken,
49 ) -> crate::Result<impl PrimaryReplicator> {
50 let com1 = &self.com_impl;
51 let com2 = self.com_impl.clone();
52 let rx = fabric_begin_end_proxy(
53 move |callback| unsafe {
54 com1.BeginOpen(openmode.into(), partition.get_com(), callback)
55 },
56 move |ctx| unsafe { com2.EndOpen(ctx) },
57 Some(cancellation_token),
58 );
59 let rplctr = rx.await??;
60
61 let catchup_specific_quorum = rplctr
63 .cast::<IFabricReplicatorCatchupSpecificQuorum>()
64 .is_ok();
65 assert!(
66 catchup_specific_quorum,
67 "mssf does not support replicator without catchup_specific_quorum interface"
68 );
69
70 let p_rplctr: IFabricPrimaryReplicator = rplctr.clone().cast().unwrap(); let res = PrimaryReplicatorProxy::new(p_rplctr);
75 Ok(res)
76 }
77
78 #[cfg_attr(
79 feature = "tracing",
80 tracing::instrument(skip_all, level = "debug", ret, err)
81 )]
82 async fn change_role(
83 &self,
84 newrole: ReplicaRole,
85 cancellation_token: BoxedCancelToken,
86 ) -> crate::Result<WString> {
87 let com1 = &self.com_impl;
89 let com2 = self.com_impl.clone();
90 let rx = fabric_begin_end_proxy(
91 move |callback| unsafe { com1.BeginChangeRole((&newrole).into(), callback) },
92 move |ctx| unsafe { com2.EndChangeRole(ctx) },
93 Some(cancellation_token),
94 );
95 let addr = rx.await??;
96 Ok(StringResult::from(&addr).into_inner())
97 }
98
99 #[cfg_attr(
100 feature = "tracing",
101 tracing::instrument(skip_all, level = "debug", ret, err)
102 )]
103 async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
104 let com1 = &self.com_impl;
105 let com2 = self.com_impl.clone();
106 let rx = fabric_begin_end_proxy(
107 move |callback| unsafe { com1.BeginClose(callback) },
108 move |ctx| unsafe { com2.EndClose(ctx) },
109 Some(cancellation_token),
110 );
111 rx.await?.map_err(crate::Error::from)
112 }
113 #[cfg_attr(
114 feature = "tracing",
115 tracing::instrument(skip_all, level = "debug", ret)
116 )]
117 fn abort(&self) {
118 unsafe { self.com_impl.Abort() }
119 }
120}
121
122pub struct ReplicatorProxy {
123 com_impl: IFabricReplicator,
124}
125
126impl ReplicatorProxy {
127 fn new(com_impl: IFabricReplicator) -> ReplicatorProxy {
128 ReplicatorProxy { com_impl }
129 }
130}
131
132impl Replicator for ReplicatorProxy {
133 #[cfg_attr(
134 feature = "tracing",
135 tracing::instrument(skip_all, level = "debug", ret, err)
136 )]
137 async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
138 let com1 = &self.com_impl;
140 let com2 = self.com_impl.clone();
141 let rx = fabric_begin_end_proxy(
142 move |callback| unsafe { com1.BeginOpen(callback) },
143 move |ctx| unsafe { com2.EndOpen(ctx) },
144 Some(cancellation_token),
145 );
146 let addr = rx.await??;
147 Ok(StringResult::from(&addr).into_inner())
148 }
149 #[cfg_attr(
150 feature = "tracing",
151 tracing::instrument(skip_all, level = "debug", ret, err)
152 )]
153 async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
154 let com1 = &self.com_impl;
155 let com2 = self.com_impl.clone();
156 let rx = fabric_begin_end_proxy(
157 move |callback| unsafe { com1.BeginClose(callback) },
158 move |ctx| unsafe { com2.EndClose(ctx) },
159 Some(cancellation_token),
160 );
161 rx.await?.map_err(crate::Error::from)
162 }
163 #[cfg_attr(
164 feature = "tracing",
165 tracing::instrument(skip_all, level = "debug", ret, err)
166 )]
167 async fn change_role(
168 &self,
169 epoch: Epoch,
170 role: ReplicaRole,
171 cancellation_token: BoxedCancelToken,
172 ) -> crate::Result<()> {
173 let com1 = &self.com_impl;
174 let com2 = self.com_impl.clone();
175 let fabric_epoch: mssf_com::FabricTypes::FABRIC_EPOCH = (&epoch).into();
176 let rx = fabric_begin_end_proxy(
177 move |callback| unsafe {
178 com1.BeginChangeRole(&fabric_epoch, (&role).into(), callback)
179 },
180 move |ctx| unsafe { com2.EndChangeRole(ctx) },
181 Some(cancellation_token),
182 );
183 rx.await?.map_err(crate::Error::from)
184 }
185 #[cfg_attr(
186 feature = "tracing",
187 tracing::instrument(skip_all, level = "debug", ret, err)
188 )]
189 async fn update_epoch(
190 &self,
191 epoch: Epoch,
192 cancellation_token: BoxedCancelToken,
193 ) -> crate::Result<()> {
194 let com1 = &self.com_impl;
195 let com2 = self.com_impl.clone();
196 let fabric_epoch: mssf_com::FabricTypes::FABRIC_EPOCH = (&epoch).into();
197 let rx = fabric_begin_end_proxy(
198 move |callback| unsafe { com1.BeginUpdateEpoch(&fabric_epoch, callback) },
199 move |ctx| unsafe { com2.EndUpdateEpoch(ctx) },
200 Some(cancellation_token),
201 );
202 rx.await?.map_err(crate::Error::from)
203 }
204 #[cfg_attr(
205 feature = "tracing",
206 tracing::instrument(skip_all, level = "debug", ret, err)
207 )]
208 #[cfg_attr(
209 feature = "tracing",
210 tracing::instrument(skip_all, level = "debug", ret, err)
211 )]
212 fn get_current_progress(&self) -> crate::Result<i64> {
213 unsafe { self.com_impl.GetCurrentProgress() }.map_err(crate::Error::from)
214 }
215 #[cfg_attr(
216 feature = "tracing",
217 tracing::instrument(skip_all, level = "debug", ret, err)
218 )]
219 fn get_catch_up_capability(&self) -> crate::Result<i64> {
220 unsafe { self.com_impl.GetCatchUpCapability() }.map_err(crate::Error::from)
221 }
222 #[cfg_attr(
223 feature = "tracing",
224 tracing::instrument(skip_all, level = "debug", ret)
225 )]
226 fn abort(&self) {
227 unsafe { self.com_impl.Abort() }
228 }
229}
230
231pub struct PrimaryReplicatorProxy {
232 com_impl: IFabricPrimaryReplicator,
233 parent: ReplicatorProxy,
234}
235
236impl PrimaryReplicatorProxy {
237 pub fn new(com_impl: IFabricPrimaryReplicator) -> PrimaryReplicatorProxy {
238 let parent = ReplicatorProxy::new(com_impl.clone().cast().unwrap());
239 PrimaryReplicatorProxy { com_impl, parent }
240 }
241}
242
243impl Replicator for PrimaryReplicatorProxy {
244 async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
245 self.parent.open(cancellation_token).await
246 }
247 async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
248 self.parent.close(cancellation_token).await
249 }
250 async fn change_role(
251 &self,
252 epoch: Epoch,
253 role: ReplicaRole,
254 cancellation_token: BoxedCancelToken,
255 ) -> crate::Result<()> {
256 self.parent
257 .change_role(epoch, role, cancellation_token)
258 .await
259 }
260 async fn update_epoch(
261 &self,
262 epoch: Epoch,
263 cancellation_token: BoxedCancelToken,
264 ) -> crate::Result<()> {
265 self.parent.update_epoch(epoch, cancellation_token).await
266 }
267 fn get_current_progress(&self) -> crate::Result<i64> {
268 self.parent.get_current_progress()
269 }
270 fn get_catch_up_capability(&self) -> crate::Result<i64> {
271 self.parent.get_catch_up_capability()
272 }
273 fn abort(&self) {
274 self.parent.abort()
275 }
276}
277
278impl PrimaryReplicator for PrimaryReplicatorProxy {
279 #[cfg_attr(
280 feature = "tracing",
281 tracing::instrument(skip_all, level = "debug", ret, err)
282 )]
283 async fn on_data_loss(&self, cancellation_token: BoxedCancelToken) -> crate::Result<u8> {
284 let com1 = &self.com_impl;
285 let com2 = self.com_impl.clone();
286 let rx = fabric_begin_end_proxy(
287 move |callback| unsafe { com1.BeginOnDataLoss(callback) },
288 move |ctx| unsafe { com2.EndOnDataLoss(ctx) },
289 Some(cancellation_token),
290 );
291 rx.await?.map_err(crate::Error::from)
292 }
293 #[cfg_attr(
294 feature = "tracing",
295 tracing::instrument(skip_all, level = "debug", ret, err)
296 )]
297 fn update_catch_up_replica_set_configuration(
298 &self,
299 currentconfiguration: ReplicaSetConfig,
300 previousconfiguration: ReplicaSetConfig,
301 ) -> crate::Result<()> {
302 let cc_view = currentconfiguration.get_view();
303 let pc_view = previousconfiguration.get_view();
304 unsafe {
305 self.com_impl
306 .UpdateCatchUpReplicaSetConfiguration(cc_view.get_raw(), pc_view.get_raw())
307 }
308 .map_err(crate::Error::from)
309 }
310 #[cfg_attr(
311 feature = "tracing",
312 tracing::instrument(skip_all, level = "debug", ret, err)
313 )]
314 async fn wait_for_catch_up_quorum(
315 &self,
316 catchupmode: ReplicaSetQuorumMode,
317 cancellation_token: BoxedCancelToken,
318 ) -> crate::Result<()> {
319 let com1 = &self.com_impl;
320 let com2 = self.com_impl.clone();
321 let rx = fabric_begin_end_proxy(
322 move |callback| unsafe { com1.BeginWaitForCatchUpQuorum(catchupmode.into(), callback) },
323 move |ctx| unsafe { com2.EndWaitForCatchUpQuorum(ctx) },
324 Some(cancellation_token),
325 );
326 rx.await?.map_err(crate::Error::from)
327 }
328 #[cfg_attr(
329 feature = "tracing",
330 tracing::instrument(skip_all, level = "debug", ret, err)
331 )]
332 fn update_current_replica_set_configuration(
333 &self,
334 currentconfiguration: ReplicaSetConfig,
335 ) -> crate::Result<()> {
336 unsafe {
337 self.com_impl
338 .UpdateCurrentReplicaSetConfiguration(currentconfiguration.get_view().get_raw())
339 }
340 .map_err(crate::Error::from)
341 }
342 #[cfg_attr(
343 feature = "tracing",
344 tracing::instrument(skip_all, level = "debug", ret, err)
345 )]
346 async fn build_replica(
347 &self,
348 replica: ReplicaInformation,
349 cancellation_token: BoxedCancelToken,
350 ) -> crate::Result<()> {
351 let com1 = &self.com_impl;
352 let com2 = self.com_impl.clone();
353 let rx = fabric_begin_end_proxy(
354 move |callback| {
355 let (mut info, ex1) = replica.get_raw_parts();
356 info.Reserved = std::ptr::addr_of!(ex1) as *mut c_void;
357 unsafe { com1.BeginBuildReplica(&info, callback) }
358 },
359 move |ctx| unsafe { com2.EndBuildReplica(ctx) },
360 Some(cancellation_token),
361 );
362 rx.await?.map_err(crate::Error::from)
363 }
364 #[cfg_attr(
365 feature = "tracing",
366 tracing::instrument(skip_all, level = "debug", ret, err)
367 )]
368 fn remove_replica(&self, replicaid: i64) -> crate::Result<()> {
369 unsafe { self.com_impl.RemoveReplica(replicaid) }.map_err(crate::Error::from)
370 }
371}
372
373#[derive(Debug, Clone)]
375pub struct StatefulServicePartition {
376 com_impl: IFabricStatefulServicePartition3,
377}
378
379impl StatefulServicePartition {
380 pub fn get_com(&self) -> &IFabricStatefulServicePartition3 {
381 &self.com_impl
382 }
383
384 pub fn get_partition_information(&self) -> crate::Result<ServicePartitionInformation> {
386 unsafe { self.com_impl.GetPartitionInfo()?.as_ref() }
387 .ok_or(ErrorCode::E_POINTER.into())
388 .map(ServicePartitionInformation::from)
389 }
390
391 pub fn get_read_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
394 unsafe { self.com_impl.GetReadStatus() }
395 .map(ServicePartitionAccessStatus::from)
396 .map_err(crate::Error::from)
397 }
398
399 pub fn get_write_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
402 unsafe { self.com_impl.GetWriteStatus() }
403 .map(ServicePartitionAccessStatus::from)
404 .map_err(crate::Error::from)
405 }
406
407 pub fn create_replicator(&self) -> crate::Result<()> {
410 Err(ErrorCode::E_NOTIMPL.into())
411 }
412
413 pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
420 let metrics_ref = LoadMetricListRef::from_slice(metrics);
421 let raw = metrics_ref.as_raw_slice();
422 unsafe { self.com_impl.ReportLoad(raw) }.map_err(crate::Error::from)
423 }
424
425 pub fn report_fault(&self, fault_type: FaultType) -> crate::Result<()> {
428 unsafe { self.com_impl.ReportFault(fault_type.into()) }.map_err(crate::Error::from)
429 }
430
431 pub fn report_move_cost(&self, move_cost: MoveCost) -> crate::Result<()> {
438 unsafe { self.com_impl.ReportMoveCost(move_cost.into()) }.map_err(crate::Error::from)
439 }
440
441 pub fn report_partition_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
450 let healthinfo_ref = &healthinfo.into();
451 unsafe { self.com_impl.ReportPartitionHealth(healthinfo_ref) }.map_err(crate::Error::from)
452 }
453
454 pub fn report_replica_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
456 let healthinfo_ref = &healthinfo.into();
457 unsafe { self.com_impl.ReportReplicaHealth(healthinfo_ref) }.map_err(crate::Error::from)
458 }
459}
460
461impl From<&IFabricStatefulServicePartition3> for StatefulServicePartition {
462 fn from(e: &IFabricStatefulServicePartition3) -> Self {
463 StatefulServicePartition {
464 com_impl: e.clone(),
465 }
466 }
467}