1use std::{
2 collections::HashMap,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc, Mutex,
6 },
7 time::Duration,
8};
9
10use exocore_protos::{
11 apps::{out_message::OutMessageType, InMessage, MessageStatus, OutMessage},
12 generated::store::{EntityQuery, EntityResults},
13 prost::Message,
14 store::MutationResult,
15};
16use exocore_store::mutation::MutationRequestLike;
17use futures::channel::oneshot;
18
19use crate::{
20 prelude::{sleep, spawn},
21 time::{now, Timestamp},
22};
23
24const MUTATION_TIMEOUT: Duration = Duration::from_secs(5);
26const QUERY_TIMEOUT: Duration = Duration::from_secs(10);
27const TIMEOUT_CHECK_INTERVAL: Duration = Duration::from_secs(1);
28
29pub struct Store {
31 next_rdv: AtomicUsize,
32 inner: Mutex<Inner>,
33
34 #[cfg(test)]
35 host_message_sender: Option<Box<dyn Fn(OutMessage) -> MessageStatus + Send + Sync>>,
36}
37
38#[derive(Default)]
39struct Inner {
40 pending_mutations: HashMap<usize, OneshotRequest<MutationResult>>,
41 pending_queries: HashMap<usize, OneshotRequest<EntityResults>>,
42}
43
44struct OneshotRequest<T> {
45 sender: oneshot::Sender<Result<T, StoreError>>,
46 timeout: Timestamp,
47}
48
49impl Store {
50 pub(crate) fn new() -> Store {
51 Store {
52 next_rdv: AtomicUsize::new(0),
53 inner: Mutex::new(Inner::default()),
54
55 #[cfg(test)]
56 host_message_sender: None,
57 }
58 }
59
60 pub async fn mutate(
61 self: &Arc<Store>,
62 mutation: impl Into<MutationRequestLike>,
63 ) -> Result<MutationResult, StoreError> {
64 let mutation = mutation.into();
65
66 let rdv = self.next_rdv.fetch_add(1, Ordering::SeqCst);
67 let msg_type = OutMessageType::StoreMutationRequest;
68 let msg = OutMessage {
69 r#type: msg_type.into(),
70 rendez_vous_id: rdv as u32,
71 data: mutation.encode_to_vec(),
72 };
73
74 let (sender, receiver) = oneshot::channel();
75 {
76 let mut inner = self.inner.lock().unwrap();
77 let pending = OneshotRequest {
78 sender,
79 timeout: now() + QUERY_TIMEOUT,
80 };
81 inner.pending_mutations.insert(rdv, pending);
82 }
83
84 self.send_host_message(msg)?;
85
86 receiver.await.map_err(StoreError::from)?
87 }
88
89 pub async fn query(self: &Arc<Store>, query: EntityQuery) -> Result<EntityResults, StoreError> {
90 let rdv = self.next_rdv.fetch_add(1, Ordering::SeqCst);
91 let msg_type = OutMessageType::StoreEntityQuery;
92 let msg = OutMessage {
93 r#type: msg_type.into(),
94 rendez_vous_id: rdv as u32,
95 data: query.encode_to_vec(),
96 };
97
98 let (sender, receiver) = oneshot::channel();
99 {
100 let mut inner = self.inner.lock().unwrap();
101 let pending = OneshotRequest {
102 sender,
103 timeout: now() + MUTATION_TIMEOUT,
104 };
105 inner.pending_queries.insert(rdv, pending);
106 }
107
108 self.send_host_message(msg)?;
109
110 receiver.await.map_err(StoreError::from)?
111 }
112
113 pub(crate) fn handle_mutation_result(&self, msg: InMessage) -> Result<(), MessageStatus> {
114 let mut inner = self.inner.lock().unwrap();
115 let rdv = msg.rendez_vous_id as usize;
116
117 if let Some(req) = inner.pending_mutations.remove(&rdv) {
118 let results = if msg.error.is_empty() {
119 Ok(MutationResult::decode(msg.data.as_ref()).map_err(|err| {
120 error!("Error decoding incoming mutation result: {}", err);
121 MessageStatus::DecodeError
122 })?)
123 } else {
124 Err(StoreError::Remote(msg.error))
125 };
126 let _ = req.sender.send(results);
127 }
128
129 Ok(())
130 }
131
132 pub(crate) fn handle_query_results(&self, msg: InMessage) -> Result<(), MessageStatus> {
133 let mut inner = self.inner.lock().unwrap();
134 let rdv = msg.rendez_vous_id as usize;
135
136 if let Some(req) = inner.pending_queries.remove(&rdv) {
137 let results = if msg.error.is_empty() {
138 Ok(EntityResults::decode(msg.data.as_ref()).map_err(|err| {
139 error!("Error decoding incoming query results: {}", err);
140 MessageStatus::DecodeError
141 })?)
142 } else {
143 Err(StoreError::Remote(msg.error))
144 };
145 let _ = req.sender.send(results);
146 }
147
148 Ok(())
149 }
150
151 pub(crate) fn start(self: &Arc<Store>) {
152 let store = self.clone();
153 spawn(async move {
154 loop {
155 let now = now();
156
157 {
158 let mut inner = store.inner.lock().unwrap();
159 check_timed_out_queries(&mut inner, now);
160 check_timed_out_mutations(&mut inner, now);
161 }
162
163 sleep(TIMEOUT_CHECK_INTERVAL).await;
164 }
165 });
166 }
167
168 #[cfg(not(test))]
169 fn send_host_message(&self, msg: OutMessage) -> Result<(), StoreError> {
170 let encoded = msg.encode_to_vec();
171 unsafe {
172 let code = crate::binding::__exocore_host_out_message(encoded.as_ptr(), encoded.len());
173 StoreError::from_message_status(code as i32)?;
174 }
175
176 Ok(())
177 }
178
179 #[cfg(test)]
180 fn send_host_message(&self, msg: OutMessage) -> Result<(), StoreError> {
181 let sender = self.host_message_sender.as_ref().unwrap();
182 let code = sender(msg);
183 StoreError::from_message_status(code as i32)?;
184
185 Ok(())
186 }
187}
188
189fn check_timed_out_queries(inner: &mut std::sync::MutexGuard<Inner>, now: Timestamp) {
190 let mut timed_out = Vec::new();
191 for (rdv, query) in &inner.pending_queries {
192 if query.timeout < now {
193 timed_out.push(*rdv);
194 }
195 }
196
197 for rdv in timed_out {
198 inner.pending_queries.remove(&rdv);
199 }
200}
201
202fn check_timed_out_mutations(inner: &mut std::sync::MutexGuard<Inner>, now: Timestamp) {
203 let mut timed_out = Vec::new();
204 for (rdv, query) in &inner.pending_mutations {
205 if query.timeout < now {
206 timed_out.push(*rdv);
207 }
208 }
209
210 for rdv in timed_out {
211 inner.pending_mutations.remove(&rdv);
212 }
213}
214
215#[derive(Debug, thiserror::Error)]
216pub enum StoreError {
217 #[error(transparent)]
218 Unknown(#[from] anyhow::Error),
219 #[error("Host message error: {0:?}")]
220 HostMessage(MessageStatus),
221 #[error("Remote store error: {0:?}")]
222 Remote(String),
223 #[error("Query or mutation got cancelled or timed out")]
224 Cancelled(#[from] oneshot::Canceled),
225}
226
227impl StoreError {
228 fn from_message_status(code: i32) -> Result<(), StoreError> {
229 match MessageStatus::try_from(code) {
230 Ok(MessageStatus::Ok) => Ok(()),
231 Ok(status) => Err(StoreError::HostMessage(status)),
232 Err(err) => Err(StoreError::Unknown(anyhow::anyhow!(
233 "Unknown message status code: {}. err: {err}",
234 code
235 ))),
236 }
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use exocore_protos::{apps::in_message::InMessageType, store::MutationRequest};
243 use futures::{channel::mpsc, StreamExt};
244
245 use super::*;
246
247 #[tokio::test]
248 async fn test_mutation() {
249 let (mut out_msg_rcv, store) = create_test_store();
250
251 let (res_sender, mut res_receiver) = oneshot::channel();
253 {
254 let store = store.clone();
255 tokio::spawn(async move {
256 let res = store.mutate(MutationRequest::default()).await;
257 res_sender.send(res).unwrap();
258 });
259 }
260
261 let out_msg = out_msg_rcv.next().await.expect("no message sent to host");
263
264 assert!(res_receiver.try_recv().unwrap().is_none());
266
267 store
269 .handle_mutation_result(InMessage {
270 r#type: InMessageType::StoreMutationResult.into(),
271 data: MutationResult {
272 operation_ids: vec![123],
273 ..Default::default()
274 }
275 .encode_to_vec(),
276 rendez_vous_id: out_msg.rendez_vous_id,
277 error: String::new(),
278 })
279 .unwrap();
280
281 let res = res_receiver.await.unwrap().unwrap();
283 assert_eq!(res.operation_ids, vec![123]);
284 }
285
286 #[tokio::test]
287 async fn test_query() {
288 let (mut out_msg_rcv, store) = create_test_store();
289
290 let (res_sender, mut res_receiver) = oneshot::channel();
292 {
293 let store = store.clone();
294 tokio::spawn(async move {
295 let res = store.query(EntityQuery::default()).await;
296 res_sender.send(res).unwrap();
297 });
298 }
299
300 let out_msg = out_msg_rcv.next().await.expect("no message sent to host");
302
303 assert!(res_receiver.try_recv().unwrap().is_none());
305
306 store
308 .handle_query_results(InMessage {
309 r#type: InMessageType::StoreEntityResults.into(),
310 data: EntityResults {
311 estimated_count: 123,
312 ..Default::default()
313 }
314 .encode_to_vec(),
315 rendez_vous_id: out_msg.rendez_vous_id,
316 error: String::new(),
317 })
318 .unwrap();
319
320 let res = res_receiver.await.unwrap().unwrap();
322 assert_eq!(res.estimated_count, 123);
323 }
324
325 fn create_test_store() -> (mpsc::Receiver<OutMessage>, Arc<Store>) {
326 let (out_msg_sender, out_msg_rcv) = mpsc::channel(1);
327 let store = {
328 let mut store = Store::new();
329 let out_msg_sender = Arc::new(Mutex::new(out_msg_sender));
330 store.host_message_sender = Some(Box::new(move |msg| {
331 let mut out_msg_sender = out_msg_sender.lock().unwrap();
332 out_msg_sender.try_send(msg).unwrap();
333 MessageStatus::Ok
334 }));
335 Arc::new(store)
336 };
337
338 (out_msg_rcv, store)
339 }
340}