exocore_store/remote/
server.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Mutex, RwLock, Weak},
4};
5
6use exocore_core::{
7    cell::Cell,
8    futures::{interval, OwnedSpawnSet},
9    time::{Duration, Instant},
10};
11use exocore_protos::{
12    generated::{
13        exocore_store::{EntityQuery, EntityResults, MutationRequest},
14        store_transport_capnp::{
15            mutation_request, query_request, unwatch_query_request, watched_query_request,
16        },
17        MessageType,
18    },
19    store::MutationResult,
20};
21use exocore_transport::{
22    messages::MessageReplyToken, InEvent, InMessage, OutEvent, OutMessage, TransportServiceHandle,
23};
24use futures::{
25    channel::{mpsc, oneshot},
26    FutureExt, SinkExt, StreamExt,
27};
28
29use super::seri::{
30    mutation_from_request_frame, mutation_result_to_response_frame, query_from_request_frame,
31    query_results_to_response_frame,
32};
33use crate::{error::Error, query::WatchToken, store::Store};
34
35pub struct Server<CS, PS, T>
36where
37    CS: exocore_chain::chain::ChainStore,
38    PS: exocore_chain::pending::PendingStore,
39    T: TransportServiceHandle,
40{
41    config: ServerConfiguration,
42    inner: Arc<RwLock<Inner<CS, PS>>>,
43    transport_handle: T,
44    transport_out_receiver: mpsc::UnboundedReceiver<OutEvent>,
45}
46
47impl<CS, PS, T> Server<CS, PS, T>
48where
49    CS: exocore_chain::chain::ChainStore,
50    PS: exocore_chain::pending::PendingStore,
51    T: TransportServiceHandle,
52{
53    pub fn new(
54        config: ServerConfiguration,
55        cell: Cell,
56        store_handle: crate::local::StoreHandle<CS, PS>,
57        transport_handle: T,
58    ) -> Result<Server<CS, PS, T>, Error> {
59        let (transport_out_sender, transport_out_receiver) = mpsc::unbounded();
60
61        let inner = Arc::new(RwLock::new(Inner {
62            config,
63            cell,
64            store_handle,
65            watched_queries: HashMap::new(),
66            transport_out_sender,
67        }));
68
69        Ok(Server {
70            config,
71            inner,
72            transport_handle,
73            transport_out_receiver,
74        })
75    }
76
77    pub async fn run(self) -> Result<(), Error> {
78        let mut transport_handle = self.transport_handle;
79
80        // send outgoing messages to transport
81        let mut transport_sink = transport_handle.get_sink();
82        let mut transport_out_receiver = self.transport_out_receiver;
83        let transport_sender = async move {
84            while let Some(event) = transport_out_receiver.next().await {
85                transport_sink.send(event).await?;
86            }
87            Ok::<(), Error>(())
88        };
89
90        // handle incoming messages
91        let weak_inner = Arc::downgrade(&self.inner);
92        let mut transport_stream = transport_handle.get_stream();
93        let transport_receiver = async move {
94            let mut spawn_set = OwnedSpawnSet::new();
95
96            while let Some(event) = transport_stream.next().await {
97                // cleanup any queries that have completed
98                spawn_set = spawn_set.cleanup().await;
99
100                if let InEvent::Message(msg) = event {
101                    trace!(
102                        "Got an incoming message. Spawn set has {} items",
103                        spawn_set.len()
104                    );
105                    if let Err(err) =
106                        Self::handle_incoming_message(&weak_inner, &mut spawn_set, msg)
107                    {
108                        if err.is_fatal() {
109                            return Err(err);
110                        } else {
111                            error!("Couldn't process incoming message: {}", err);
112                        }
113                    }
114                }
115            }
116
117            Ok::<(), Error>(())
118        };
119
120        // management timer
121        let weak_inner = Arc::downgrade(&self.inner);
122        let management_timer_interval = self.config.management_timer_interval;
123        let management_timer = async move {
124            let mut interval = interval(management_timer_interval);
125            loop {
126                interval.tick().await;
127                Self::management_timer_process(&weak_inner)?;
128            }
129
130            // types the async block
131            #[allow(unreachable_code)]
132            Ok::<(), Error>(())
133        };
134
135        info!("Remote store server started");
136
137        futures::select! {
138            _ = transport_sender.fuse() => {},
139            _ = transport_receiver.fuse() => {},
140            _ = management_timer.fuse() => {},
141            _ = transport_handle.fuse() => {},
142        };
143
144        Ok(())
145    }
146
147    fn handle_incoming_message(
148        weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
149        spawn_set: &mut OwnedSpawnSet<()>,
150        in_message: InMessage,
151    ) -> Result<(), Error> {
152        let parsed_message = IncomingMessage::parse_incoming_message(&in_message)?;
153
154        match parsed_message {
155            IncomingMessage::Mutation(mutation) => {
156                Self::handle_incoming_mutation_message(
157                    weak_inner, spawn_set, in_message, mutation,
158                )?;
159            }
160            IncomingMessage::Query(query) => {
161                Self::handle_incoming_query_message(weak_inner, spawn_set, in_message, query)?;
162            }
163            IncomingMessage::WatchedQuery(query) => {
164                Self::handle_incoming_watched_query_message(
165                    weak_inner, spawn_set, in_message, query,
166                )?;
167            }
168            IncomingMessage::UnwatchQuery(token) => {
169                Self::handle_unwatch_query(weak_inner, token)?;
170            }
171        }
172
173        Ok(())
174    }
175
176    fn handle_incoming_query_message(
177        weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
178        spawn_set: &mut OwnedSpawnSet<()>,
179        in_message: InMessage,
180        query: Box<EntityQuery>,
181    ) -> Result<(), Error> {
182        let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
183
184        let future_result = {
185            let inner = inner.read()?;
186            let store_handle = inner.store_handle.clone();
187
188            async move { store_handle.query(query.as_ref().clone()).await }
189        };
190
191        let weak_inner = weak_inner.clone();
192        let send_response = move |result: Result<EntityResults, Error>| -> Result<(), Error> {
193            let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
194            let inner = inner.read()?;
195
196            let resp_frame = query_results_to_response_frame(result)?;
197            let message = in_message.to_response_message(&inner.cell, resp_frame)?;
198            inner.send_message(message)?;
199
200            Ok(())
201        };
202
203        spawn_set.spawn(async move {
204            let result = future_result.await;
205
206            if let Err(err) = &result {
207                error!("Returning error executing incoming query: {}", err);
208            }
209
210            if let Err(err) = send_response(result) {
211                error!("Error sending response for incoming query: {}", err);
212            }
213        });
214
215        Ok(())
216    }
217
218    fn handle_incoming_watched_query_message(
219        weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
220        spawn_set: &mut OwnedSpawnSet<()>,
221        in_message: InMessage,
222        query: Box<EntityQuery>,
223    ) -> Result<(), Error> {
224        let watch_token = query.watch_token;
225
226        // wrap token used to reply to message in an arc mutex so that we can change it
227        // if we get registered again via another connection
228        let reply_token = in_message.get_reply_token()?;
229        let arc_reply_token = Arc::new(Mutex::new(reply_token.clone()));
230
231        let weak_inner1 = weak_inner.clone();
232        let (result_stream, drop_receiver) = {
233            // check if this query already exists. if so, just update its last register
234            let inner = weak_inner1.upgrade().ok_or(Error::Dropped)?;
235            let mut inner = inner.write()?;
236            if let Some(watch_query) = inner.watched_queries.get_mut(&watch_token) {
237                watch_query.last_register = Instant::now();
238                watch_query.set_reply_token(reply_token);
239                return Ok(());
240            }
241
242            // register query
243            let (drop_sender, drop_receiver) = oneshot::channel();
244            let registered_watched_query = RegisteredWatchedQuery {
245                last_register: Instant::now(),
246                reply_token: arc_reply_token.clone(),
247                _drop_sender: drop_sender,
248            };
249            inner
250                .watched_queries
251                .insert(watch_token, registered_watched_query);
252
253            let result_stream = inner.store_handle.watched_query(query.as_ref().clone())?;
254
255            (result_stream, drop_receiver)
256        };
257
258        let weak_inner1 = weak_inner.clone();
259        let send_response = move |result: Result<EntityResults, Error>| -> Result<(), Error> {
260            let inner = weak_inner1.upgrade().ok_or(Error::Dropped)?;
261            let inner = inner.read()?;
262
263            let reply_token = arc_reply_token.lock().unwrap();
264            let resp_frame = query_results_to_response_frame(result)?;
265            let message = reply_token.to_response_message(&inner.cell, resp_frame)?;
266            inner.send_message(message)?;
267
268            Ok(())
269        };
270
271        spawn_set.spawn(async move {
272            let send_response1 = send_response.clone();
273            let stream_consumer = async move {
274                let mut result_stream = result_stream;
275                while let Some(result) = result_stream.next().await {
276                    if let Err(err) = &result {
277                        error!("Returning error executing incoming query: {}", err);
278                    }
279
280                    if let Err(err) = send_response1(result) {
281                        error!("Error sending response to watched query: {}", err);
282                        break;
283                    }
284                }
285            };
286
287            futures::select! {
288                _ = stream_consumer.fuse() => {},
289                _ = drop_receiver.fuse() => {
290                    debug!("Registered query with token {:?} got dropped", watch_token);
291                   let _ = send_response(Err(Error::WatchedUnregistered));
292                },
293            };
294        });
295
296        Ok(())
297    }
298
299    fn handle_incoming_mutation_message(
300        weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
301        spawn_set: &mut OwnedSpawnSet<()>,
302        in_message: InMessage,
303        request: Box<MutationRequest>,
304    ) -> Result<(), Error> {
305        let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
306
307        let future_result = {
308            let inner = inner.read()?;
309            let store_handle = inner.store_handle.clone();
310
311            async move { store_handle.mutate(request.as_ref().clone()).await }
312        };
313
314        let weak_inner = weak_inner.clone();
315        let send_response = move |result: Result<MutationResult, Error>| -> Result<(), Error> {
316            let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
317            let inner = inner.read()?;
318
319            let resp_frame = mutation_result_to_response_frame(result)?;
320            let message = in_message.to_response_message(&inner.cell, resp_frame)?;
321
322            inner.send_message(message)?;
323
324            Ok(())
325        };
326
327        spawn_set.spawn(async move {
328            let result = future_result.await;
329
330            if let Err(err) = &result {
331                error!("Returning error executing incoming mutation: {}", err);
332            }
333
334            if let Err(err) = send_response(result) {
335                error!("Error sending response for incoming mutation: {}", err);
336            }
337        });
338
339        Ok(())
340    }
341
342    fn handle_unwatch_query(
343        weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
344        token: WatchToken,
345    ) -> Result<(), Error> {
346        let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
347        let mut inner = inner.write()?;
348        inner.watched_queries.remove(&token);
349        Ok(())
350    }
351
352    fn management_timer_process(weak_inner: &Weak<RwLock<Inner<CS, PS>>>) -> Result<(), Error> {
353        let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
354        let mut inner = inner.write()?;
355
356        let timeout_duration = inner.config.watched_queries_register_timeout;
357        let mut timed_out_tokens = Vec::new();
358        for (token, watched_query) in &mut inner.watched_queries {
359            if watched_query.last_register.elapsed() > timeout_duration {
360                debug!(
361                    "Watched query with token={:?} timed out after {:?}, dropping it",
362                    token,
363                    watched_query.last_register.elapsed(),
364                );
365                timed_out_tokens.push(*token);
366            }
367        }
368
369        for token in timed_out_tokens {
370            inner.watched_queries.remove(&token);
371        }
372
373        Ok(())
374    }
375}
376
377#[derive(Clone, Copy)]
378pub struct ServerConfiguration {
379    pub watched_queries_register_timeout: Duration,
380    pub management_timer_interval: Duration,
381}
382
383impl Default for ServerConfiguration {
384    fn default() -> Self {
385        ServerConfiguration {
386            watched_queries_register_timeout: Duration::from_secs(30),
387            management_timer_interval: Duration::from_millis(500),
388        }
389    }
390}
391
392struct Inner<CS, PS>
393where
394    CS: exocore_chain::chain::ChainStore,
395    PS: exocore_chain::pending::PendingStore,
396{
397    config: ServerConfiguration,
398    cell: Cell,
399    store_handle: crate::local::StoreHandle<CS, PS>,
400    watched_queries: HashMap<WatchToken, RegisteredWatchedQuery>,
401    transport_out_sender: mpsc::UnboundedSender<OutEvent>,
402}
403
404impl<CS, PS> Inner<CS, PS>
405where
406    CS: exocore_chain::chain::ChainStore,
407    PS: exocore_chain::pending::PendingStore,
408{
409    fn send_message(&self, message: OutMessage) -> Result<(), Error> {
410        self.transport_out_sender
411            .unbounded_send(OutEvent::Message(message))
412            .map_err(|_err| {
413                Error::Fatal(anyhow!(
414                    "Tried to send message, but transport_out channel is closed"
415                ))
416            })?;
417
418        Ok(())
419    }
420}
421
422enum IncomingMessage {
423    Mutation(Box<MutationRequest>),
424    Query(Box<EntityQuery>),
425    WatchedQuery(Box<EntityQuery>),
426    UnwatchQuery(WatchToken),
427}
428
429impl IncomingMessage {
430    fn parse_incoming_message(in_message: &InMessage) -> Result<IncomingMessage, Error> {
431        match in_message.typ {
432            <mutation_request::Owned as MessageType>::MESSAGE_TYPE => {
433                let frame = in_message.get_data_as_framed_message()?;
434                let mutation = mutation_from_request_frame(frame)?;
435                Ok(IncomingMessage::Mutation(Box::new(mutation)))
436            }
437            <query_request::Owned as MessageType>::MESSAGE_TYPE => {
438                let frame = in_message.get_data_as_framed_message()?;
439                let query = query_from_request_frame(frame)?;
440                Ok(IncomingMessage::Query(Box::new(query)))
441            }
442            <watched_query_request::Owned as MessageType>::MESSAGE_TYPE => {
443                let frame = in_message.get_data_as_framed_message()?;
444                let query = query_from_request_frame(frame)?;
445                Ok(IncomingMessage::WatchedQuery(Box::new(query)))
446            }
447            <unwatch_query_request::Owned as MessageType>::MESSAGE_TYPE => {
448                let frame =
449                    in_message.get_data_as_framed_message::<unwatch_query_request::Owned>()?;
450                let reader = frame.get_reader()?;
451                let watch_token = reader.get_token();
452                Ok(IncomingMessage::UnwatchQuery(watch_token))
453            }
454            other => Err(anyhow!("Received message of unknown type: {}", other).into()),
455        }
456    }
457}
458
459struct RegisteredWatchedQuery {
460    last_register: Instant,
461    reply_token: Arc<Mutex<MessageReplyToken>>,
462
463    // selected by stream's future to get killed if we drop this query for timeout
464    _drop_sender: oneshot::Sender<()>,
465}
466
467impl RegisteredWatchedQuery {
468    fn set_reply_token(&self, token: MessageReplyToken) {
469        let mut reply_token = self.reply_token.lock().unwrap();
470        *reply_token = token;
471    }
472}