simperby_network/dms/
server.rs

1use super::*;
2
3impl<S: Storage, M: DmsMessage> DistributedMessageSet<S, M> {
4    /// Runs a DMS server. This function will block the current thread.
5    pub async fn serve(
6        dms: Arc<RwLock<DistributedMessageSet<S, M>>>,
7        network_config: ServerNetworkConfig,
8    ) -> Result<(), Error> {
9        let rpc_task = async move {
10            let wrapped_dms = Arc::new(parking_lot::RwLock::new(Some(dms)));
11            struct DropHelper<T> {
12                wrapped_dms: Arc<parking_lot::RwLock<Option<Arc<RwLock<T>>>>>,
13            }
14            impl<T> Drop for DropHelper<T> {
15                fn drop(&mut self) {
16                    self.wrapped_dms.write().take().unwrap();
17                }
18            }
19            let _drop_helper = DropHelper {
20                wrapped_dms: Arc::clone(&wrapped_dms),
21            };
22            run_server(
23                network_config.port,
24                [(
25                    "dms".to_owned(),
26                    create_http_object(Arc::new(DmsWrapper { dms: wrapped_dms })
27                        as Arc<dyn DistributedMessageSetRpcInterface>),
28                )]
29                .iter()
30                .cloned()
31                .collect(),
32            )
33            .await;
34        };
35        rpc_task.await;
36        Ok(())
37    }
38
39    /// Runs a DMS client with auto-sync. This function will block the current thread.
40    pub async fn sync(
41        dms: Arc<RwLock<DistributedMessageSet<S, M>>>,
42        fetch_interval: Option<Duration>,
43        broadcast_interval: Option<Duration>,
44        network_config: ClientNetworkConfig,
45    ) -> Result<(), Error> {
46        let dms_ = Arc::clone(&dms);
47        let network_config_ = network_config.clone();
48        let fetch_task = async move {
49            if let Some(interval) = fetch_interval {
50                loop {
51                    if let Err(e) =
52                        DistributedMessageSet::<S, M>::fetch(Arc::clone(&dms_), &network_config_)
53                            .await
54                    {
55                        log::warn!("failed to parse message from the RPC-fetch: {}", e);
56                    }
57                    tokio::time::sleep(interval).await;
58                }
59            } else {
60                futures::future::pending::<()>().await;
61            }
62        };
63        let dms_ = Arc::clone(&dms);
64        let broadcast_task = async move {
65            if let Some(interval) = broadcast_interval {
66                loop {
67                    if let Err(e) =
68                        DistributedMessageSet::<S, M>::broadcast(Arc::clone(&dms_), &network_config)
69                            .await
70                    {
71                        log::warn!("failed to parse message from the RPC-broadcast: {}", e);
72                    }
73                    tokio::time::sleep(interval).await;
74                }
75            } else {
76                futures::future::pending::<()>().await;
77            }
78        };
79        join(fetch_task, broadcast_task).await;
80        Ok(())
81    }
82}