mssf_core/runtime/
store_proxy.rs1use crate::{PCWSTR, runtime::executor::BoxedCancelToken};
7use mssf_com::{
8 FabricRuntime::{
9 IFabricKeyValueStoreItemResult, IFabricKeyValueStoreReplica2, IFabricTransaction,
10 },
11 FabricTypes::{FABRIC_KEY_VALUE_STORE_ITEM, FABRIC_KEY_VALUE_STORE_ITEM_METADATA},
12};
13
14use crate::sync::fabric_begin_end_proxy;
15
16use crate::types::TransactionIsolationLevel;
17
18#[derive(Clone)]
20pub struct KVStoreProxy {
21 com_impl: IFabricKeyValueStoreReplica2,
22}
23
24pub struct TransactionProxy {
25 com_impl: IFabricTransaction,
26}
27
28pub struct KVStoreItemProxy {
29 com_impl: IFabricKeyValueStoreItemResult,
30}
31
32impl KVStoreItemProxy {
33 pub fn key(&self) -> &[u16] {
34 let item = self.get_item_inner();
35 let meta = Self::get_meta_inner(item);
36 unsafe { meta.Key.as_wide() }
37 }
38
39 pub fn val(&self) -> &[u8] {
40 let item = self.get_item_inner();
41 let meta = Self::get_meta_inner(item);
42 unsafe { std::slice::from_raw_parts(item.Value, meta.ValueSizeInBytes as usize) }
43 }
44
45 fn get_item_inner(&self) -> &FABRIC_KEY_VALUE_STORE_ITEM {
46 unsafe { self.com_impl.get_Item().as_ref().unwrap() }
47 }
48 fn get_meta_inner(item: &FABRIC_KEY_VALUE_STORE_ITEM) -> &FABRIC_KEY_VALUE_STORE_ITEM_METADATA {
49 unsafe { item.Metadata.as_ref().unwrap() }
50 }
51}
52
53impl KVStoreProxy {
54 pub fn new(com_impl: IFabricKeyValueStoreReplica2) -> KVStoreProxy {
55 KVStoreProxy { com_impl }
56 }
57
58 pub fn create_transaction(&self) -> crate::Result<TransactionProxy> {
59 let tx = unsafe { self.com_impl.CreateTransaction() }?;
60 Ok(TransactionProxy { com_impl: tx })
61 }
62
63 pub fn add(&self, tx: &TransactionProxy, key: &[u16], value: &[u8]) -> crate::Result<()> {
64 unsafe {
65 self.com_impl
66 .Add(&tx.com_impl, PCWSTR::from_raw(key.as_ptr()), value)
67 }
68 .map_err(crate::Error::from)
69 }
70
71 pub fn get(&self, tx: &TransactionProxy, key: &[u16]) -> crate::Result<KVStoreItemProxy> {
72 let com = unsafe {
73 self.com_impl
74 .Get(&tx.com_impl, PCWSTR::from_raw(key.as_ptr()))
75 }?;
76 Ok(KVStoreItemProxy { com_impl: com })
77 }
78
79 pub fn remove(
83 &self,
84 tx: &TransactionProxy,
85 key: &[u16],
86 checksequencenumber: i64,
87 ) -> crate::Result<()> {
88 unsafe {
89 self.com_impl.Remove(
90 &tx.com_impl,
91 PCWSTR::from_raw(key.as_ptr()),
92 checksequencenumber,
93 )
94 }
95 .map_err(crate::Error::from)
96 }
97}
98
99impl TransactionProxy {
100 pub fn get_id(&self) -> &crate::GUID {
101 unsafe { self.com_impl.get_Id().as_ref().unwrap() }
102 }
103
104 pub fn get_isolation_level(&self) -> TransactionIsolationLevel {
105 unsafe { self.com_impl.get_IsolationLevel().into() }
106 }
107
108 #[cfg_attr(
109 feature = "tracing",
110 tracing::instrument(skip_all, level = "debug", ret, err)
111 )]
112 pub async fn commit(
113 &self,
114 timeoutmilliseconds: u32,
115 cancellation_token: Option<BoxedCancelToken>,
116 ) -> crate::Result<i64> {
117 let com1 = &self.com_impl;
118 let com2 = self.com_impl.clone();
119 let rx = fabric_begin_end_proxy(
120 move |callback| unsafe { com1.BeginCommit(timeoutmilliseconds, callback) },
121 move |ctx| unsafe { com2.EndCommit(ctx) },
122 cancellation_token,
123 );
124 rx.await?.map_err(crate::Error::from)
125 }
126
127 pub fn rollback(&self) {
128 unsafe { self.com_impl.Rollback() };
129 }
130}