mssf_core/runtime/
store_proxy.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use crate::{PCWSTR, runtime::executor::BoxedCancelToken};
7use mssf_com::{
8    FabricRuntime::{
9        IFabricKeyValueStoreItemResult, IFabricKeyValueStoreReplica2, IFabricTransaction,
10    },
11    FabricTypes::{FABRIC_KEY_VALUE_STORE_ITEM, FABRIC_KEY_VALUE_STORE_ITEM_METADATA},
12};
13
14use crate::sync::fabric_begin_end_proxy;
15
16use crate::types::TransactionIsolationLevel;
17
18// wrapp for kv store
19#[derive(Clone)]
20pub struct KVStoreProxy {
21    com_impl: IFabricKeyValueStoreReplica2,
22}
23
24pub struct TransactionProxy {
25    com_impl: IFabricTransaction,
26}
27
28pub struct KVStoreItemProxy {
29    com_impl: IFabricKeyValueStoreItemResult,
30}
31
32impl KVStoreItemProxy {
33    pub fn key(&self) -> &[u16] {
34        let item = self.get_item_inner();
35        let meta = Self::get_meta_inner(item);
36        unsafe { meta.Key.as_wide() }
37    }
38
39    pub fn val(&self) -> &[u8] {
40        let item = self.get_item_inner();
41        let meta = Self::get_meta_inner(item);
42        unsafe { std::slice::from_raw_parts(item.Value, meta.ValueSizeInBytes as usize) }
43    }
44
45    fn get_item_inner(&self) -> &FABRIC_KEY_VALUE_STORE_ITEM {
46        unsafe { self.com_impl.get_Item().as_ref().unwrap() }
47    }
48    fn get_meta_inner(item: &FABRIC_KEY_VALUE_STORE_ITEM) -> &FABRIC_KEY_VALUE_STORE_ITEM_METADATA {
49        unsafe { item.Metadata.as_ref().unwrap() }
50    }
51}
52
53impl KVStoreProxy {
54    pub fn new(com_impl: IFabricKeyValueStoreReplica2) -> KVStoreProxy {
55        KVStoreProxy { com_impl }
56    }
57
58    pub fn create_transaction(&self) -> crate::Result<TransactionProxy> {
59        let tx = unsafe { self.com_impl.CreateTransaction() }?;
60        Ok(TransactionProxy { com_impl: tx })
61    }
62
63    pub fn add(&self, tx: &TransactionProxy, key: &[u16], value: &[u8]) -> crate::Result<()> {
64        unsafe {
65            self.com_impl
66                .Add(&tx.com_impl, PCWSTR::from_raw(key.as_ptr()), value)
67        }
68        .map_err(crate::Error::from)
69    }
70
71    pub fn get(&self, tx: &TransactionProxy, key: &[u16]) -> crate::Result<KVStoreItemProxy> {
72        let com = unsafe {
73            self.com_impl
74                .Get(&tx.com_impl, PCWSTR::from_raw(key.as_ptr()))
75        }?;
76        Ok(KVStoreItemProxy { com_impl: com })
77    }
78
79    // check sequence number is the lsn that last time the key got modified.
80    // if lsn does not match the remove will error out.
81    // specify 0 to ignore check.
82    pub fn remove(
83        &self,
84        tx: &TransactionProxy,
85        key: &[u16],
86        checksequencenumber: i64,
87    ) -> crate::Result<()> {
88        unsafe {
89            self.com_impl.Remove(
90                &tx.com_impl,
91                PCWSTR::from_raw(key.as_ptr()),
92                checksequencenumber,
93            )
94        }
95        .map_err(crate::Error::from)
96    }
97}
98
99impl TransactionProxy {
100    pub fn get_id(&self) -> &crate::GUID {
101        unsafe { self.com_impl.get_Id().as_ref().unwrap() }
102    }
103
104    pub fn get_isolation_level(&self) -> TransactionIsolationLevel {
105        unsafe { self.com_impl.get_IsolationLevel().into() }
106    }
107
108    #[cfg_attr(
109        feature = "tracing",
110        tracing::instrument(skip_all, level = "debug", ret, err)
111    )]
112    pub async fn commit(
113        &self,
114        timeoutmilliseconds: u32,
115        cancellation_token: Option<BoxedCancelToken>,
116    ) -> crate::Result<i64> {
117        let com1 = &self.com_impl;
118        let com2 = self.com_impl.clone();
119        let rx = fabric_begin_end_proxy(
120            move |callback| unsafe { com1.BeginCommit(timeoutmilliseconds, callback) },
121            move |ctx| unsafe { com2.EndCommit(ctx) },
122            cancellation_token,
123        );
124        rx.await?.map_err(crate::Error::from)
125    }
126
127    pub fn rollback(&self) {
128        unsafe { self.com_impl.Rollback() };
129    }
130}