simperby_network/dms/
server.rs1use super::*;
2
3impl<S: Storage, M: DmsMessage> DistributedMessageSet<S, M> {
4 pub async fn serve(
6 dms: Arc<RwLock<DistributedMessageSet<S, M>>>,
7 network_config: ServerNetworkConfig,
8 ) -> Result<(), Error> {
9 let rpc_task = async move {
10 let wrapped_dms = Arc::new(parking_lot::RwLock::new(Some(dms)));
11 struct DropHelper<T> {
12 wrapped_dms: Arc<parking_lot::RwLock<Option<Arc<RwLock<T>>>>>,
13 }
14 impl<T> Drop for DropHelper<T> {
15 fn drop(&mut self) {
16 self.wrapped_dms.write().take().unwrap();
17 }
18 }
19 let _drop_helper = DropHelper {
20 wrapped_dms: Arc::clone(&wrapped_dms),
21 };
22 run_server(
23 network_config.port,
24 [(
25 "dms".to_owned(),
26 create_http_object(Arc::new(DmsWrapper { dms: wrapped_dms })
27 as Arc<dyn DistributedMessageSetRpcInterface>),
28 )]
29 .iter()
30 .cloned()
31 .collect(),
32 )
33 .await;
34 };
35 rpc_task.await;
36 Ok(())
37 }
38
39 pub async fn sync(
41 dms: Arc<RwLock<DistributedMessageSet<S, M>>>,
42 fetch_interval: Option<Duration>,
43 broadcast_interval: Option<Duration>,
44 network_config: ClientNetworkConfig,
45 ) -> Result<(), Error> {
46 let dms_ = Arc::clone(&dms);
47 let network_config_ = network_config.clone();
48 let fetch_task = async move {
49 if let Some(interval) = fetch_interval {
50 loop {
51 if let Err(e) =
52 DistributedMessageSet::<S, M>::fetch(Arc::clone(&dms_), &network_config_)
53 .await
54 {
55 log::warn!("failed to parse message from the RPC-fetch: {}", e);
56 }
57 tokio::time::sleep(interval).await;
58 }
59 } else {
60 futures::future::pending::<()>().await;
61 }
62 };
63 let dms_ = Arc::clone(&dms);
64 let broadcast_task = async move {
65 if let Some(interval) = broadcast_interval {
66 loop {
67 if let Err(e) =
68 DistributedMessageSet::<S, M>::broadcast(Arc::clone(&dms_), &network_config)
69 .await
70 {
71 log::warn!("failed to parse message from the RPC-broadcast: {}", e);
72 }
73 tokio::time::sleep(interval).await;
74 }
75 } else {
76 futures::future::pending::<()>().await;
77 }
78 };
79 join(fetch_task, broadcast_task).await;
80 Ok(())
81 }
82}