1use std::ffi::c_void;
10
11use crate::{Interface, WString, runtime::executor::BoxedCancelToken};
12use mssf_com::FabricRuntime::{
13 IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
14 IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
15};
16
17use crate::{
18 error::ErrorCode,
19 strings::WStringWrap,
20 sync::fabric_begin_end_proxy,
21 types::{
22 FaultType, HealthInformation, LoadMetric, LoadMetricListRef, MoveCost, ReplicaRole,
23 ServicePartitionAccessStatus, ServicePartitionInformation,
24 },
25};
26
27use super::stateful::{PrimaryReplicator, Replicator, StatefulServiceReplica};
28use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode};
29
30pub struct StatefulServiceReplicaProxy {
31 com_impl: IFabricStatefulServiceReplica,
32}
33
34impl StatefulServiceReplicaProxy {
35 pub fn new(com_impl: IFabricStatefulServiceReplica) -> StatefulServiceReplicaProxy {
36 StatefulServiceReplicaProxy { com_impl }
37 }
38}
39
40impl StatefulServiceReplica for StatefulServiceReplicaProxy {
41 #[cfg_attr(
42 feature = "tracing",
43 tracing::instrument(skip_all, level = "debug", err) )]
45 async fn open(
46 &self,
47 openmode: OpenMode,
48 partition: &StatefulServicePartition,
49 cancellation_token: BoxedCancelToken,
50 ) -> crate::Result<impl PrimaryReplicator> {
51 let com1 = &self.com_impl;
52 let com2 = self.com_impl.clone();
53 let rx = fabric_begin_end_proxy(
54 move |callback| unsafe {
55 com1.BeginOpen(openmode.into(), partition.get_com(), callback)
56 },
57 move |ctx| unsafe { com2.EndOpen(ctx) },
58 Some(cancellation_token),
59 );
60 let rplctr = rx.await??;
61
62 let catchup_specific_quorum = rplctr
64 .cast::<IFabricReplicatorCatchupSpecificQuorum>()
65 .is_ok();
66 assert!(
67 catchup_specific_quorum,
68 "mssf does not support replicator without catchup_specific_quorum interface"
69 );
70
71 let p_rplctr: IFabricPrimaryReplicator = rplctr.clone().cast().unwrap(); let res = PrimaryReplicatorProxy::new(p_rplctr);
76 Ok(res)
77 }
78
79 #[cfg_attr(
80 feature = "tracing",
81 tracing::instrument(skip_all, level = "debug", ret, err)
82 )]
83 async fn change_role(
84 &self,
85 newrole: ReplicaRole,
86 cancellation_token: BoxedCancelToken,
87 ) -> crate::Result<WString> {
88 let com1 = &self.com_impl;
90 let com2 = self.com_impl.clone();
91 let rx = fabric_begin_end_proxy(
92 move |callback| unsafe { com1.BeginChangeRole((&newrole).into(), callback) },
93 move |ctx| unsafe { com2.EndChangeRole(ctx) },
94 Some(cancellation_token),
95 );
96 let addr = rx.await??;
97 Ok(WStringWrap::from(&addr).into())
98 }
99
100 #[cfg_attr(
101 feature = "tracing",
102 tracing::instrument(skip_all, level = "debug", ret, err)
103 )]
104 async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
105 let com1 = &self.com_impl;
106 let com2 = self.com_impl.clone();
107 let rx = fabric_begin_end_proxy(
108 move |callback| unsafe { com1.BeginClose(callback) },
109 move |ctx| unsafe { com2.EndClose(ctx) },
110 Some(cancellation_token),
111 );
112 rx.await?.map_err(crate::Error::from)
113 }
114 #[cfg_attr(
115 feature = "tracing",
116 tracing::instrument(skip_all, level = "debug", ret)
117 )]
118 fn abort(&self) {
119 unsafe { self.com_impl.Abort() }
120 }
121}
122
123pub struct ReplicatorProxy {
124 com_impl: IFabricReplicator,
125}
126
127impl ReplicatorProxy {
128 fn new(com_impl: IFabricReplicator) -> ReplicatorProxy {
129 ReplicatorProxy { com_impl }
130 }
131}
132
133impl Replicator for ReplicatorProxy {
134 #[cfg_attr(
135 feature = "tracing",
136 tracing::instrument(skip_all, level = "debug", ret, err)
137 )]
138 async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
139 let com1 = &self.com_impl;
141 let com2 = self.com_impl.clone();
142 let rx = fabric_begin_end_proxy(
143 move |callback| unsafe { com1.BeginOpen(callback) },
144 move |ctx| unsafe { com2.EndOpen(ctx) },
145 Some(cancellation_token),
146 );
147 let addr = rx.await??;
148 Ok(WStringWrap::from(&addr).into())
149 }
150 #[cfg_attr(
151 feature = "tracing",
152 tracing::instrument(skip_all, level = "debug", ret, err)
153 )]
154 async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
155 let com1 = &self.com_impl;
156 let com2 = self.com_impl.clone();
157 let rx = fabric_begin_end_proxy(
158 move |callback| unsafe { com1.BeginClose(callback) },
159 move |ctx| unsafe { com2.EndClose(ctx) },
160 Some(cancellation_token),
161 );
162 rx.await?.map_err(crate::Error::from)
163 }
164 #[cfg_attr(
165 feature = "tracing",
166 tracing::instrument(skip_all, level = "debug", ret, err)
167 )]
168 async fn change_role(
169 &self,
170 epoch: &Epoch,
171 role: &ReplicaRole,
172 cancellation_token: BoxedCancelToken,
173 ) -> crate::Result<()> {
174 let com1 = &self.com_impl;
175 let com2 = self.com_impl.clone();
176 let rx = fabric_begin_end_proxy(
177 move |callback| unsafe { com1.BeginChangeRole(&epoch.into(), role.into(), callback) },
178 move |ctx| unsafe { com2.EndChangeRole(ctx) },
179 Some(cancellation_token),
180 );
181 rx.await?.map_err(crate::Error::from)
182 }
183 #[cfg_attr(
184 feature = "tracing",
185 tracing::instrument(skip_all, level = "debug", ret, err)
186 )]
187 async fn update_epoch(
188 &self,
189 epoch: &Epoch,
190 cancellation_token: BoxedCancelToken,
191 ) -> crate::Result<()> {
192 let com1 = &self.com_impl;
193 let com2 = self.com_impl.clone();
194 let rx = fabric_begin_end_proxy(
195 move |callback| unsafe { com1.BeginUpdateEpoch(&epoch.into(), callback) },
196 move |ctx| unsafe { com2.EndUpdateEpoch(ctx) },
197 Some(cancellation_token),
198 );
199 rx.await?.map_err(crate::Error::from)
200 }
201 #[cfg_attr(
202 feature = "tracing",
203 tracing::instrument(skip_all, level = "debug", ret, err)
204 )]
205 #[cfg_attr(
206 feature = "tracing",
207 tracing::instrument(skip_all, level = "debug", ret, err)
208 )]
209 fn get_current_progress(&self) -> crate::Result<i64> {
210 unsafe { self.com_impl.GetCurrentProgress() }.map_err(crate::Error::from)
211 }
212 #[cfg_attr(
213 feature = "tracing",
214 tracing::instrument(skip_all, level = "debug", ret, err)
215 )]
216 fn get_catch_up_capability(&self) -> crate::Result<i64> {
217 unsafe { self.com_impl.GetCatchUpCapability() }.map_err(crate::Error::from)
218 }
219 #[cfg_attr(
220 feature = "tracing",
221 tracing::instrument(skip_all, level = "debug", ret)
222 )]
223 fn abort(&self) {
224 unsafe { self.com_impl.Abort() }
225 }
226}
227
228pub struct PrimaryReplicatorProxy {
229 com_impl: IFabricPrimaryReplicator,
230 parent: ReplicatorProxy,
231}
232
233impl PrimaryReplicatorProxy {
234 pub fn new(com_impl: IFabricPrimaryReplicator) -> PrimaryReplicatorProxy {
235 let parent = ReplicatorProxy::new(com_impl.clone().cast().unwrap());
236 PrimaryReplicatorProxy { com_impl, parent }
237 }
238}
239
240impl Replicator for PrimaryReplicatorProxy {
241 async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
242 self.parent.open(cancellation_token).await
243 }
244 async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
245 self.parent.close(cancellation_token).await
246 }
247 async fn change_role(
248 &self,
249 epoch: &Epoch,
250 role: &ReplicaRole,
251 cancellation_token: BoxedCancelToken,
252 ) -> crate::Result<()> {
253 self.parent
254 .change_role(epoch, role, cancellation_token)
255 .await
256 }
257 async fn update_epoch(
258 &self,
259 epoch: &Epoch,
260 cancellation_token: BoxedCancelToken,
261 ) -> crate::Result<()> {
262 self.parent.update_epoch(epoch, cancellation_token).await
263 }
264 fn get_current_progress(&self) -> crate::Result<i64> {
265 self.parent.get_current_progress()
266 }
267 fn get_catch_up_capability(&self) -> crate::Result<i64> {
268 self.parent.get_catch_up_capability()
269 }
270 fn abort(&self) {
271 self.parent.abort()
272 }
273}
274
275impl PrimaryReplicator for PrimaryReplicatorProxy {
276 #[cfg_attr(
277 feature = "tracing",
278 tracing::instrument(skip_all, level = "debug", ret, err)
279 )]
280 async fn on_data_loss(&self, cancellation_token: BoxedCancelToken) -> crate::Result<u8> {
281 let com1 = &self.com_impl;
282 let com2 = self.com_impl.clone();
283 let rx = fabric_begin_end_proxy(
284 move |callback| unsafe { com1.BeginOnDataLoss(callback) },
285 move |ctx| unsafe { com2.EndOnDataLoss(ctx) },
286 Some(cancellation_token),
287 );
288 rx.await?.map_err(crate::Error::from)
289 }
290 #[cfg_attr(
291 feature = "tracing",
292 tracing::instrument(skip_all, level = "debug", ret, err)
293 )]
294 fn update_catch_up_replica_set_configuration(
295 &self,
296 currentconfiguration: &ReplicaSetConfig,
297 previousconfiguration: &ReplicaSetConfig,
298 ) -> crate::Result<()> {
299 let cc_view = currentconfiguration.get_view();
300 let pc_view = previousconfiguration.get_view();
301 unsafe {
302 self.com_impl
303 .UpdateCatchUpReplicaSetConfiguration(cc_view.get_raw(), pc_view.get_raw())
304 }
305 .map_err(crate::Error::from)
306 }
307 #[cfg_attr(
308 feature = "tracing",
309 tracing::instrument(skip_all, level = "debug", ret, err)
310 )]
311 async fn wait_for_catch_up_quorum(
312 &self,
313 catchupmode: ReplicaSetQuorumMode,
314 cancellation_token: BoxedCancelToken,
315 ) -> crate::Result<()> {
316 let com1 = &self.com_impl;
317 let com2 = self.com_impl.clone();
318 let rx = fabric_begin_end_proxy(
319 move |callback| unsafe { com1.BeginWaitForCatchUpQuorum(catchupmode.into(), callback) },
320 move |ctx| unsafe { com2.EndWaitForCatchUpQuorum(ctx) },
321 Some(cancellation_token),
322 );
323 rx.await?.map_err(crate::Error::from)
324 }
325 #[cfg_attr(
326 feature = "tracing",
327 tracing::instrument(skip_all, level = "debug", ret, err)
328 )]
329 fn update_current_replica_set_configuration(
330 &self,
331 currentconfiguration: &ReplicaSetConfig,
332 ) -> crate::Result<()> {
333 unsafe {
334 self.com_impl
335 .UpdateCurrentReplicaSetConfiguration(currentconfiguration.get_view().get_raw())
336 }
337 .map_err(crate::Error::from)
338 }
339 #[cfg_attr(
340 feature = "tracing",
341 tracing::instrument(skip_all, level = "debug", ret, err)
342 )]
343 async fn build_replica(
344 &self,
345 replica: &ReplicaInformation,
346 cancellation_token: BoxedCancelToken,
347 ) -> crate::Result<()> {
348 let com1 = &self.com_impl;
349 let com2 = self.com_impl.clone();
350 let rx = fabric_begin_end_proxy(
351 move |callback| {
352 let (mut info, ex1) = replica.get_raw_parts();
353 info.Reserved = std::ptr::addr_of!(ex1) as *mut c_void;
354 unsafe { com1.BeginBuildReplica(&info, callback) }
355 },
356 move |ctx| unsafe { com2.EndBuildReplica(ctx) },
357 Some(cancellation_token),
358 );
359 rx.await?.map_err(crate::Error::from)
360 }
361 #[cfg_attr(
362 feature = "tracing",
363 tracing::instrument(skip_all, level = "debug", ret, err)
364 )]
365 fn remove_replica(&self, replicaid: i64) -> crate::Result<()> {
366 unsafe { self.com_impl.RemoveReplica(replicaid) }.map_err(crate::Error::from)
367 }
368}
369
370#[derive(Debug, Clone)]
372pub struct StatefulServicePartition {
373 com_impl: IFabricStatefulServicePartition3,
374}
375
376impl StatefulServicePartition {
377 pub fn get_com(&self) -> &IFabricStatefulServicePartition3 {
378 &self.com_impl
379 }
380
381 pub fn get_partition_information(&self) -> crate::Result<ServicePartitionInformation> {
383 unsafe { self.com_impl.GetPartitionInfo()?.as_ref() }
384 .ok_or(ErrorCode::E_POINTER.into())
385 .map(ServicePartitionInformation::from)
386 }
387
388 pub fn get_read_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
391 unsafe { self.com_impl.GetReadStatus() }
392 .map(ServicePartitionAccessStatus::from)
393 .map_err(crate::Error::from)
394 }
395
396 pub fn get_write_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
399 unsafe { self.com_impl.GetWriteStatus() }
400 .map(ServicePartitionAccessStatus::from)
401 .map_err(crate::Error::from)
402 }
403
404 pub fn create_replicator(&self) -> crate::Result<()> {
407 Err(ErrorCode::E_NOTIMPL.into())
408 }
409
410 pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
417 let metrics_ref = LoadMetricListRef::from_slice(metrics);
418 let raw = metrics_ref.as_raw_slice();
419 unsafe { self.com_impl.ReportLoad(raw) }.map_err(crate::Error::from)
420 }
421
422 pub fn report_fault(&self, fault_type: FaultType) -> crate::Result<()> {
425 unsafe { self.com_impl.ReportFault(fault_type.into()) }.map_err(crate::Error::from)
426 }
427
428 pub fn report_move_cost(&self, move_cost: MoveCost) -> crate::Result<()> {
435 unsafe { self.com_impl.ReportMoveCost(move_cost.into()) }.map_err(crate::Error::from)
436 }
437
438 pub fn report_partition_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
447 let healthinfo_ref = &healthinfo.into();
448 unsafe { self.com_impl.ReportPartitionHealth(healthinfo_ref) }.map_err(crate::Error::from)
449 }
450
451 pub fn report_replica_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
453 let healthinfo_ref = &healthinfo.into();
454 unsafe { self.com_impl.ReportReplicaHealth(healthinfo_ref) }.map_err(crate::Error::from)
455 }
456}
457
458impl From<&IFabricStatefulServicePartition3> for StatefulServicePartition {
459 fn from(e: &IFabricStatefulServicePartition3) -> Self {
460 StatefulServicePartition {
461 com_impl: e.clone(),
462 }
463 }
464}