sos_ipc/
server.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use crate::{LocalWebService, Result, ServiceAppInfo};
use hyper::server::conn::http1::Builder;
use hyper_util::rt::tokio::TokioIo;
use interprocess::local_socket::{
    tokio::prelude::*, GenericNamespaced, ListenerOptions,
};
use sos_protocol::{Merge, SyncStorage};
use sos_sdk::prelude::{Account, AccountSwitcher};
use std::sync::Arc;
use tokio::sync::RwLock;

/// Socket server for inter-process communication.
pub struct LocalSocketServer;

impl LocalSocketServer {
    /// Listen on a named pipe.
    pub async fn listen<A, R, E>(
        socket_name: &str,
        accounts: Arc<RwLock<AccountSwitcher<A, R, E>>>,
        app_info: ServiceAppInfo,
    ) -> Result<()>
    where
        A: Account<Error = E, NetworkResult = R>
            + SyncStorage
            + Merge
            + Sync
            + Send
            + 'static,
        R: 'static,
        E: std::fmt::Debug
            + From<sos_sdk::Error>
            + From<std::io::Error>
            + 'static,
    {
        let name = socket_name.to_ns_name::<GenericNamespaced>()?;
        let opts = ListenerOptions::new().name(name);
        let listener = match opts.create_tokio() {
            Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
                tracing::error!(
                    "Error: could not start server because the socket file is occupied. Please check if {socket_name} is in use by another process and try again."
                );
                return Err(e.into());
            }
            x => x?,
        };

        let service = LocalWebService::new(app_info, accounts);
        let svc = Arc::new(service);

        loop {
            let socket = listener.accept().await?;
            let svc = svc.clone();
            tokio::spawn(async move {
                let socket = TokioIo::new(socket);
                let http = Builder::new();
                let conn = http.serve_connection(socket, svc);
                if let Err(err) = conn.await {
                    tracing::error!(
                      error = %err,
                      "ipc::server::connection");
                }
            });
        }
    }
}