Skip to main content

exocore_apps_sdk/
store.rs

1use std::{
2    collections::HashMap,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc, Mutex,
6    },
7    time::Duration,
8};
9
10use exocore_protos::{
11    apps::{out_message::OutMessageType, InMessage, MessageStatus, OutMessage},
12    generated::store::{EntityQuery, EntityResults},
13    prost::Message,
14    store::MutationResult,
15};
16use exocore_store::mutation::MutationRequestLike;
17use futures::channel::oneshot;
18
19use crate::{
20    prelude::{sleep, spawn},
21    time::{now, Timestamp},
22};
23
24// Keep in sync with remote store `ClientConfiguration`
25const MUTATION_TIMEOUT: Duration = Duration::from_secs(5);
26const QUERY_TIMEOUT: Duration = Duration::from_secs(10);
27const TIMEOUT_CHECK_INTERVAL: Duration = Duration::from_secs(1);
28
29/// Exocore entities store client.
30pub struct Store {
31    next_rdv: AtomicUsize,
32    inner: Mutex<Inner>,
33
34    #[cfg(test)]
35    host_message_sender: Option<Box<dyn Fn(OutMessage) -> MessageStatus + Send + Sync>>,
36}
37
38#[derive(Default)]
39struct Inner {
40    pending_mutations: HashMap<usize, OneshotRequest<MutationResult>>,
41    pending_queries: HashMap<usize, OneshotRequest<EntityResults>>,
42}
43
44struct OneshotRequest<T> {
45    sender: oneshot::Sender<Result<T, StoreError>>,
46    timeout: Timestamp,
47}
48
49impl Store {
50    pub(crate) fn new() -> Store {
51        Store {
52            next_rdv: AtomicUsize::new(0),
53            inner: Mutex::new(Inner::default()),
54
55            #[cfg(test)]
56            host_message_sender: None,
57        }
58    }
59
60    pub async fn mutate(
61        self: &Arc<Store>,
62        mutation: impl Into<MutationRequestLike>,
63    ) -> Result<MutationResult, StoreError> {
64        let mutation = mutation.into();
65
66        let rdv = self.next_rdv.fetch_add(1, Ordering::SeqCst);
67        let msg_type = OutMessageType::StoreMutationRequest;
68        let msg = OutMessage {
69            r#type: msg_type.into(),
70            rendez_vous_id: rdv as u32,
71            data: mutation.encode_to_vec(),
72        };
73
74        let (sender, receiver) = oneshot::channel();
75        {
76            let mut inner = self.inner.lock().unwrap();
77            let pending = OneshotRequest {
78                sender,
79                timeout: now() + QUERY_TIMEOUT,
80            };
81            inner.pending_mutations.insert(rdv, pending);
82        }
83
84        self.send_host_message(msg)?;
85
86        receiver.await.map_err(StoreError::from)?
87    }
88
89    pub async fn query(self: &Arc<Store>, query: EntityQuery) -> Result<EntityResults, StoreError> {
90        let rdv = self.next_rdv.fetch_add(1, Ordering::SeqCst);
91        let msg_type = OutMessageType::StoreEntityQuery;
92        let msg = OutMessage {
93            r#type: msg_type.into(),
94            rendez_vous_id: rdv as u32,
95            data: query.encode_to_vec(),
96        };
97
98        let (sender, receiver) = oneshot::channel();
99        {
100            let mut inner = self.inner.lock().unwrap();
101            let pending = OneshotRequest {
102                sender,
103                timeout: now() + MUTATION_TIMEOUT,
104            };
105            inner.pending_queries.insert(rdv, pending);
106        }
107
108        self.send_host_message(msg)?;
109
110        receiver.await.map_err(StoreError::from)?
111    }
112
113    pub(crate) fn handle_mutation_result(&self, msg: InMessage) -> Result<(), MessageStatus> {
114        let mut inner = self.inner.lock().unwrap();
115        let rdv = msg.rendez_vous_id as usize;
116
117        if let Some(req) = inner.pending_mutations.remove(&rdv) {
118            let results = if msg.error.is_empty() {
119                Ok(MutationResult::decode(msg.data.as_ref()).map_err(|err| {
120                    error!("Error decoding incoming mutation result: {}", err);
121                    MessageStatus::DecodeError
122                })?)
123            } else {
124                Err(StoreError::Remote(msg.error))
125            };
126            let _ = req.sender.send(results);
127        }
128
129        Ok(())
130    }
131
132    pub(crate) fn handle_query_results(&self, msg: InMessage) -> Result<(), MessageStatus> {
133        let mut inner = self.inner.lock().unwrap();
134        let rdv = msg.rendez_vous_id as usize;
135
136        if let Some(req) = inner.pending_queries.remove(&rdv) {
137            let results = if msg.error.is_empty() {
138                Ok(EntityResults::decode(msg.data.as_ref()).map_err(|err| {
139                    error!("Error decoding incoming query results: {}", err);
140                    MessageStatus::DecodeError
141                })?)
142            } else {
143                Err(StoreError::Remote(msg.error))
144            };
145            let _ = req.sender.send(results);
146        }
147
148        Ok(())
149    }
150
151    pub(crate) fn start(self: &Arc<Store>) {
152        let store = self.clone();
153        spawn(async move {
154            loop {
155                let now = now();
156
157                {
158                    let mut inner = store.inner.lock().unwrap();
159                    check_timed_out_queries(&mut inner, now);
160                    check_timed_out_mutations(&mut inner, now);
161                }
162
163                sleep(TIMEOUT_CHECK_INTERVAL).await;
164            }
165        });
166    }
167
168    #[cfg(not(test))]
169    fn send_host_message(&self, msg: OutMessage) -> Result<(), StoreError> {
170        let encoded = msg.encode_to_vec();
171        unsafe {
172            let code = crate::binding::__exocore_host_out_message(encoded.as_ptr(), encoded.len());
173            StoreError::from_message_status(code as i32)?;
174        }
175
176        Ok(())
177    }
178
179    #[cfg(test)]
180    fn send_host_message(&self, msg: OutMessage) -> Result<(), StoreError> {
181        let sender = self.host_message_sender.as_ref().unwrap();
182        let code = sender(msg);
183        StoreError::from_message_status(code as i32)?;
184
185        Ok(())
186    }
187}
188
189fn check_timed_out_queries(inner: &mut std::sync::MutexGuard<Inner>, now: Timestamp) {
190    let mut timed_out = Vec::new();
191    for (rdv, query) in &inner.pending_queries {
192        if query.timeout < now {
193            timed_out.push(*rdv);
194        }
195    }
196
197    for rdv in timed_out {
198        inner.pending_queries.remove(&rdv);
199    }
200}
201
202fn check_timed_out_mutations(inner: &mut std::sync::MutexGuard<Inner>, now: Timestamp) {
203    let mut timed_out = Vec::new();
204    for (rdv, query) in &inner.pending_mutations {
205        if query.timeout < now {
206            timed_out.push(*rdv);
207        }
208    }
209
210    for rdv in timed_out {
211        inner.pending_mutations.remove(&rdv);
212    }
213}
214
215#[derive(Debug, thiserror::Error)]
216pub enum StoreError {
217    #[error(transparent)]
218    Unknown(#[from] anyhow::Error),
219    #[error("Host message error: {0:?}")]
220    HostMessage(MessageStatus),
221    #[error("Remote store error: {0:?}")]
222    Remote(String),
223    #[error("Query or mutation got cancelled or timed out")]
224    Cancelled(#[from] oneshot::Canceled),
225}
226
227impl StoreError {
228    fn from_message_status(code: i32) -> Result<(), StoreError> {
229        match MessageStatus::try_from(code) {
230            Ok(MessageStatus::Ok) => Ok(()),
231            Ok(status) => Err(StoreError::HostMessage(status)),
232            Err(err) => Err(StoreError::Unknown(anyhow::anyhow!(
233                "Unknown message status code: {}. err: {err}",
234                code
235            ))),
236        }
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use exocore_protos::{apps::in_message::InMessageType, store::MutationRequest};
243    use futures::{channel::mpsc, StreamExt};
244
245    use super::*;
246
247    #[tokio::test]
248    async fn test_mutation() {
249        let (mut out_msg_rcv, store) = create_test_store();
250
251        // spawn a mutation request
252        let (res_sender, mut res_receiver) = oneshot::channel();
253        {
254            let store = store.clone();
255            tokio::spawn(async move {
256                let res = store.mutate(MutationRequest::default()).await;
257                res_sender.send(res).unwrap();
258            });
259        }
260
261        // the mutation should have been sent to host
262        let out_msg = out_msg_rcv.next().await.expect("no message sent to host");
263
264        // mutation shouldn't have resolved yet since we didn't send results back
265        assert!(res_receiver.try_recv().unwrap().is_none());
266
267        // host sends back results
268        store
269            .handle_mutation_result(InMessage {
270                r#type: InMessageType::StoreMutationResult.into(),
271                data: MutationResult {
272                    operation_ids: vec![123],
273                    ..Default::default()
274                }
275                .encode_to_vec(),
276                rendez_vous_id: out_msg.rendez_vous_id,
277                error: String::new(),
278            })
279            .unwrap();
280
281        // mutation should now have been resolved
282        let res = res_receiver.await.unwrap().unwrap();
283        assert_eq!(res.operation_ids, vec![123]);
284    }
285
286    #[tokio::test]
287    async fn test_query() {
288        let (mut out_msg_rcv, store) = create_test_store();
289
290        // spawn a query
291        let (res_sender, mut res_receiver) = oneshot::channel();
292        {
293            let store = store.clone();
294            tokio::spawn(async move {
295                let res = store.query(EntityQuery::default()).await;
296                res_sender.send(res).unwrap();
297            });
298        }
299
300        // the query should have been sent to host
301        let out_msg = out_msg_rcv.next().await.expect("no message sent to host");
302
303        // query shouldn't have resolved yet since we didn't send results back
304        assert!(res_receiver.try_recv().unwrap().is_none());
305
306        // host sends back results
307        store
308            .handle_query_results(InMessage {
309                r#type: InMessageType::StoreEntityResults.into(),
310                data: EntityResults {
311                    estimated_count: 123,
312                    ..Default::default()
313                }
314                .encode_to_vec(),
315                rendez_vous_id: out_msg.rendez_vous_id,
316                error: String::new(),
317            })
318            .unwrap();
319
320        // query should now have been resolved
321        let res = res_receiver.await.unwrap().unwrap();
322        assert_eq!(res.estimated_count, 123);
323    }
324
325    fn create_test_store() -> (mpsc::Receiver<OutMessage>, Arc<Store>) {
326        let (out_msg_sender, out_msg_rcv) = mpsc::channel(1);
327        let store = {
328            let mut store = Store::new();
329            let out_msg_sender = Arc::new(Mutex::new(out_msg_sender));
330            store.host_message_sender = Some(Box::new(move |msg| {
331                let mut out_msg_sender = out_msg_sender.lock().unwrap();
332                out_msg_sender.try_send(msg).unwrap();
333                MessageStatus::Ok
334            }));
335            Arc::new(store)
336        };
337
338        (out_msg_rcv, store)
339    }
340}