1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex, RwLock, Weak},
4};
5
6use exocore_core::{
7 cell::Cell,
8 futures::{interval, OwnedSpawnSet},
9 time::{Duration, Instant},
10};
11use exocore_protos::{
12 generated::{
13 exocore_store::{EntityQuery, EntityResults, MutationRequest},
14 store_transport_capnp::{
15 mutation_request, query_request, unwatch_query_request, watched_query_request,
16 },
17 MessageType,
18 },
19 store::MutationResult,
20};
21use exocore_transport::{
22 messages::MessageReplyToken, InEvent, InMessage, OutEvent, OutMessage, TransportServiceHandle,
23};
24use futures::{
25 channel::{mpsc, oneshot},
26 FutureExt, SinkExt, StreamExt,
27};
28
29use super::seri::{
30 mutation_from_request_frame, mutation_result_to_response_frame, query_from_request_frame,
31 query_results_to_response_frame,
32};
33use crate::{error::Error, query::WatchToken, store::Store};
34
35pub struct Server<CS, PS, T>
36where
37 CS: exocore_chain::chain::ChainStore,
38 PS: exocore_chain::pending::PendingStore,
39 T: TransportServiceHandle,
40{
41 config: ServerConfiguration,
42 inner: Arc<RwLock<Inner<CS, PS>>>,
43 transport_handle: T,
44 transport_out_receiver: mpsc::UnboundedReceiver<OutEvent>,
45}
46
47impl<CS, PS, T> Server<CS, PS, T>
48where
49 CS: exocore_chain::chain::ChainStore,
50 PS: exocore_chain::pending::PendingStore,
51 T: TransportServiceHandle,
52{
53 pub fn new(
54 config: ServerConfiguration,
55 cell: Cell,
56 store_handle: crate::local::StoreHandle<CS, PS>,
57 transport_handle: T,
58 ) -> Result<Server<CS, PS, T>, Error> {
59 let (transport_out_sender, transport_out_receiver) = mpsc::unbounded();
60
61 let inner = Arc::new(RwLock::new(Inner {
62 config,
63 cell,
64 store_handle,
65 watched_queries: HashMap::new(),
66 transport_out_sender,
67 }));
68
69 Ok(Server {
70 config,
71 inner,
72 transport_handle,
73 transport_out_receiver,
74 })
75 }
76
77 pub async fn run(self) -> Result<(), Error> {
78 let mut transport_handle = self.transport_handle;
79
80 let mut transport_sink = transport_handle.get_sink();
82 let mut transport_out_receiver = self.transport_out_receiver;
83 let transport_sender = async move {
84 while let Some(event) = transport_out_receiver.next().await {
85 transport_sink.send(event).await?;
86 }
87 Ok::<(), Error>(())
88 };
89
90 let weak_inner = Arc::downgrade(&self.inner);
92 let mut transport_stream = transport_handle.get_stream();
93 let transport_receiver = async move {
94 let mut spawn_set = OwnedSpawnSet::new();
95
96 while let Some(event) = transport_stream.next().await {
97 spawn_set = spawn_set.cleanup().await;
99
100 if let InEvent::Message(msg) = event {
101 trace!(
102 "Got an incoming message. Spawn set has {} items",
103 spawn_set.len()
104 );
105 if let Err(err) =
106 Self::handle_incoming_message(&weak_inner, &mut spawn_set, msg)
107 {
108 if err.is_fatal() {
109 return Err(err);
110 } else {
111 error!("Couldn't process incoming message: {}", err);
112 }
113 }
114 }
115 }
116
117 Ok::<(), Error>(())
118 };
119
120 let weak_inner = Arc::downgrade(&self.inner);
122 let management_timer_interval = self.config.management_timer_interval;
123 let management_timer = async move {
124 let mut interval = interval(management_timer_interval);
125 loop {
126 interval.tick().await;
127 Self::management_timer_process(&weak_inner)?;
128 }
129
130 #[allow(unreachable_code)]
132 Ok::<(), Error>(())
133 };
134
135 info!("Remote store server started");
136
137 futures::select! {
138 _ = transport_sender.fuse() => {},
139 _ = transport_receiver.fuse() => {},
140 _ = management_timer.fuse() => {},
141 _ = transport_handle.fuse() => {},
142 };
143
144 Ok(())
145 }
146
147 fn handle_incoming_message(
148 weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
149 spawn_set: &mut OwnedSpawnSet<()>,
150 in_message: InMessage,
151 ) -> Result<(), Error> {
152 let parsed_message = IncomingMessage::parse_incoming_message(&in_message)?;
153
154 match parsed_message {
155 IncomingMessage::Mutation(mutation) => {
156 Self::handle_incoming_mutation_message(
157 weak_inner, spawn_set, in_message, mutation,
158 )?;
159 }
160 IncomingMessage::Query(query) => {
161 Self::handle_incoming_query_message(weak_inner, spawn_set, in_message, query)?;
162 }
163 IncomingMessage::WatchedQuery(query) => {
164 Self::handle_incoming_watched_query_message(
165 weak_inner, spawn_set, in_message, query,
166 )?;
167 }
168 IncomingMessage::UnwatchQuery(token) => {
169 Self::handle_unwatch_query(weak_inner, token)?;
170 }
171 }
172
173 Ok(())
174 }
175
176 fn handle_incoming_query_message(
177 weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
178 spawn_set: &mut OwnedSpawnSet<()>,
179 in_message: InMessage,
180 query: Box<EntityQuery>,
181 ) -> Result<(), Error> {
182 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
183
184 let future_result = {
185 let inner = inner.read()?;
186 let store_handle = inner.store_handle.clone();
187
188 async move { store_handle.query(query.as_ref().clone()).await }
189 };
190
191 let weak_inner = weak_inner.clone();
192 let send_response = move |result: Result<EntityResults, Error>| -> Result<(), Error> {
193 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
194 let inner = inner.read()?;
195
196 let resp_frame = query_results_to_response_frame(result)?;
197 let message = in_message.to_response_message(&inner.cell, resp_frame)?;
198 inner.send_message(message)?;
199
200 Ok(())
201 };
202
203 spawn_set.spawn(async move {
204 let result = future_result.await;
205
206 if let Err(err) = &result {
207 error!("Returning error executing incoming query: {}", err);
208 }
209
210 if let Err(err) = send_response(result) {
211 error!("Error sending response for incoming query: {}", err);
212 }
213 });
214
215 Ok(())
216 }
217
218 fn handle_incoming_watched_query_message(
219 weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
220 spawn_set: &mut OwnedSpawnSet<()>,
221 in_message: InMessage,
222 query: Box<EntityQuery>,
223 ) -> Result<(), Error> {
224 let watch_token = query.watch_token;
225
226 let reply_token = in_message.get_reply_token()?;
229 let arc_reply_token = Arc::new(Mutex::new(reply_token.clone()));
230
231 let weak_inner1 = weak_inner.clone();
232 let (result_stream, drop_receiver) = {
233 let inner = weak_inner1.upgrade().ok_or(Error::Dropped)?;
235 let mut inner = inner.write()?;
236 if let Some(watch_query) = inner.watched_queries.get_mut(&watch_token) {
237 watch_query.last_register = Instant::now();
238 watch_query.set_reply_token(reply_token);
239 return Ok(());
240 }
241
242 let (drop_sender, drop_receiver) = oneshot::channel();
244 let registered_watched_query = RegisteredWatchedQuery {
245 last_register: Instant::now(),
246 reply_token: arc_reply_token.clone(),
247 _drop_sender: drop_sender,
248 };
249 inner
250 .watched_queries
251 .insert(watch_token, registered_watched_query);
252
253 let result_stream = inner.store_handle.watched_query(query.as_ref().clone())?;
254
255 (result_stream, drop_receiver)
256 };
257
258 let weak_inner1 = weak_inner.clone();
259 let send_response = move |result: Result<EntityResults, Error>| -> Result<(), Error> {
260 let inner = weak_inner1.upgrade().ok_or(Error::Dropped)?;
261 let inner = inner.read()?;
262
263 let reply_token = arc_reply_token.lock().unwrap();
264 let resp_frame = query_results_to_response_frame(result)?;
265 let message = reply_token.to_response_message(&inner.cell, resp_frame)?;
266 inner.send_message(message)?;
267
268 Ok(())
269 };
270
271 spawn_set.spawn(async move {
272 let send_response1 = send_response.clone();
273 let stream_consumer = async move {
274 let mut result_stream = result_stream;
275 while let Some(result) = result_stream.next().await {
276 if let Err(err) = &result {
277 error!("Returning error executing incoming query: {}", err);
278 }
279
280 if let Err(err) = send_response1(result) {
281 error!("Error sending response to watched query: {}", err);
282 break;
283 }
284 }
285 };
286
287 futures::select! {
288 _ = stream_consumer.fuse() => {},
289 _ = drop_receiver.fuse() => {
290 debug!("Registered query with token {:?} got dropped", watch_token);
291 let _ = send_response(Err(Error::WatchedUnregistered));
292 },
293 };
294 });
295
296 Ok(())
297 }
298
299 fn handle_incoming_mutation_message(
300 weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
301 spawn_set: &mut OwnedSpawnSet<()>,
302 in_message: InMessage,
303 request: Box<MutationRequest>,
304 ) -> Result<(), Error> {
305 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
306
307 let future_result = {
308 let inner = inner.read()?;
309 let store_handle = inner.store_handle.clone();
310
311 async move { store_handle.mutate(request.as_ref().clone()).await }
312 };
313
314 let weak_inner = weak_inner.clone();
315 let send_response = move |result: Result<MutationResult, Error>| -> Result<(), Error> {
316 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
317 let inner = inner.read()?;
318
319 let resp_frame = mutation_result_to_response_frame(result)?;
320 let message = in_message.to_response_message(&inner.cell, resp_frame)?;
321
322 inner.send_message(message)?;
323
324 Ok(())
325 };
326
327 spawn_set.spawn(async move {
328 let result = future_result.await;
329
330 if let Err(err) = &result {
331 error!("Returning error executing incoming mutation: {}", err);
332 }
333
334 if let Err(err) = send_response(result) {
335 error!("Error sending response for incoming mutation: {}", err);
336 }
337 });
338
339 Ok(())
340 }
341
342 fn handle_unwatch_query(
343 weak_inner: &Weak<RwLock<Inner<CS, PS>>>,
344 token: WatchToken,
345 ) -> Result<(), Error> {
346 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
347 let mut inner = inner.write()?;
348 inner.watched_queries.remove(&token);
349 Ok(())
350 }
351
352 fn management_timer_process(weak_inner: &Weak<RwLock<Inner<CS, PS>>>) -> Result<(), Error> {
353 let inner = weak_inner.upgrade().ok_or(Error::Dropped)?;
354 let mut inner = inner.write()?;
355
356 let timeout_duration = inner.config.watched_queries_register_timeout;
357 let mut timed_out_tokens = Vec::new();
358 for (token, watched_query) in &mut inner.watched_queries {
359 if watched_query.last_register.elapsed() > timeout_duration {
360 debug!(
361 "Watched query with token={:?} timed out after {:?}, dropping it",
362 token,
363 watched_query.last_register.elapsed(),
364 );
365 timed_out_tokens.push(*token);
366 }
367 }
368
369 for token in timed_out_tokens {
370 inner.watched_queries.remove(&token);
371 }
372
373 Ok(())
374 }
375}
376
377#[derive(Clone, Copy)]
378pub struct ServerConfiguration {
379 pub watched_queries_register_timeout: Duration,
380 pub management_timer_interval: Duration,
381}
382
383impl Default for ServerConfiguration {
384 fn default() -> Self {
385 ServerConfiguration {
386 watched_queries_register_timeout: Duration::from_secs(30),
387 management_timer_interval: Duration::from_millis(500),
388 }
389 }
390}
391
392struct Inner<CS, PS>
393where
394 CS: exocore_chain::chain::ChainStore,
395 PS: exocore_chain::pending::PendingStore,
396{
397 config: ServerConfiguration,
398 cell: Cell,
399 store_handle: crate::local::StoreHandle<CS, PS>,
400 watched_queries: HashMap<WatchToken, RegisteredWatchedQuery>,
401 transport_out_sender: mpsc::UnboundedSender<OutEvent>,
402}
403
404impl<CS, PS> Inner<CS, PS>
405where
406 CS: exocore_chain::chain::ChainStore,
407 PS: exocore_chain::pending::PendingStore,
408{
409 fn send_message(&self, message: OutMessage) -> Result<(), Error> {
410 self.transport_out_sender
411 .unbounded_send(OutEvent::Message(message))
412 .map_err(|_err| {
413 Error::Fatal(anyhow!(
414 "Tried to send message, but transport_out channel is closed"
415 ))
416 })?;
417
418 Ok(())
419 }
420}
421
422enum IncomingMessage {
423 Mutation(Box<MutationRequest>),
424 Query(Box<EntityQuery>),
425 WatchedQuery(Box<EntityQuery>),
426 UnwatchQuery(WatchToken),
427}
428
429impl IncomingMessage {
430 fn parse_incoming_message(in_message: &InMessage) -> Result<IncomingMessage, Error> {
431 match in_message.typ {
432 <mutation_request::Owned as MessageType>::MESSAGE_TYPE => {
433 let frame = in_message.get_data_as_framed_message()?;
434 let mutation = mutation_from_request_frame(frame)?;
435 Ok(IncomingMessage::Mutation(Box::new(mutation)))
436 }
437 <query_request::Owned as MessageType>::MESSAGE_TYPE => {
438 let frame = in_message.get_data_as_framed_message()?;
439 let query = query_from_request_frame(frame)?;
440 Ok(IncomingMessage::Query(Box::new(query)))
441 }
442 <watched_query_request::Owned as MessageType>::MESSAGE_TYPE => {
443 let frame = in_message.get_data_as_framed_message()?;
444 let query = query_from_request_frame(frame)?;
445 Ok(IncomingMessage::WatchedQuery(Box::new(query)))
446 }
447 <unwatch_query_request::Owned as MessageType>::MESSAGE_TYPE => {
448 let frame =
449 in_message.get_data_as_framed_message::<unwatch_query_request::Owned>()?;
450 let reader = frame.get_reader()?;
451 let watch_token = reader.get_token();
452 Ok(IncomingMessage::UnwatchQuery(watch_token))
453 }
454 other => Err(anyhow!("Received message of unknown type: {}", other).into()),
455 }
456 }
457}
458
459struct RegisteredWatchedQuery {
460 last_register: Instant,
461 reply_token: Arc<Mutex<MessageReplyToken>>,
462
463 _drop_sender: oneshot::Sender<()>,
465}
466
467impl RegisteredWatchedQuery {
468 fn set_reply_token(&self, token: MessageReplyToken) {
469 let mut reply_token = self.reply_token.lock().unwrap();
470 *reply_token = token;
471 }
472}