pass_it_on/
server.rs

1use crate::configuration::ServerConfiguration;
2use crate::endpoints::{setup_endpoints, EndpointChannel};
3use crate::interfaces::setup_server_interfaces;
4use crate::notifications::{Notification, ValidatedNotification};
5use crate::shutdown::listen_for_shutdown;
6use crate::{Error, CHANNEL_BUFFER};
7use tracing::{debug, info, warn};
8use tokio::sync::{mpsc, watch};
9
10const DEFAULT_WAIT_FOR_SHUTDOWN_SECS: u64 = 2;
11
12/// Start the server with provided [`ServerConfiguration`].
13///
14/// Server listens for shutdown signals SIGTERM & SIGINT on Unix or CTRL-BREAK and CTRL-C on Windows.
15/// Also accepts a `Option<tokio::sync::watch::Receiver<bool>>` to shut down the client in addition to
16/// system signals.
17pub async fn start_server(
18    server_config: ServerConfiguration,
19    shutdown: Option<watch::Receiver<bool>>,
20    wait_for_shutdown_secs: Option<u64>,
21) -> Result<(), Error> {
22    // Setup channels
23    let (shutdown_tx, shutdown_rx) = watch::channel(false);
24    let (interface_tx, interface_rx) = mpsc::channel(CHANNEL_BUFFER);
25
26    // Start monitoring the configured interfaces
27    let interfaces = server_config.interfaces();
28    setup_server_interfaces(interfaces, interface_tx.clone(), shutdown_rx.clone()).await?;
29
30    // Setup endpoints to receive messages
31    let endpoints = server_config.endpoint_channels();
32    setup_endpoints(endpoints.clone(), shutdown_rx.clone()).await?;
33
34    // Monitor for messages on the interface channel
35    tokio::spawn(async move {
36        process_incoming_notifications(interface_rx, endpoints).await;
37    });
38
39    // Shutdown
40    let shutdown_secs = wait_for_shutdown_secs.unwrap_or(DEFAULT_WAIT_FOR_SHUTDOWN_SECS);
41    info!("Listening for shutdown signals");
42    listen_for_shutdown(shutdown_tx, shutdown, shutdown_secs).await;
43
44    Ok(())
45}
46
47async fn process_incoming_notifications(mut msg_rx: mpsc::Receiver<String>, endpoints: Vec<EndpointChannel>) {
48    info!("Processing Notifications");
49
50    while let Some(msg) = msg_rx.recv().await {
51        let notifications = Notification::from_json_multi(msg.as_str());
52
53        for notification in notifications {
54            match notification {
55                Ok(note) => {
56                    debug!("Notification received: {:?}", note);
57                    for endpoint in &endpoints {
58                        for (sub_name, keys) in endpoint.keys() {
59                            if note.validate_set(keys) {
60                                let channel = endpoint.channel_sender();
61                                match channel.send(ValidatedNotification::new(sub_name, note.message())) {
62                                    Ok(ok) => {
63                                        debug!("Message sent to endpoint. Subscribers: {}", ok)
64                                    }
65                                    Err(e) => warn!(
66                                        
67                                        "Error sending validated message to endpoint: {}", e
68                                    ),
69                                };
70                            }
71                        }
72                    }
73                }
74
75                Err(e) => warn!("Notification processing error: {}", e),
76            }
77        }
78    }
79}
80
81#[cfg(feature = "matrix")]
82/// Interactively verify devices for all Matrix endpoints in the provided [`ServerConfiguration`].
83pub async fn verify_matrix_devices(server_config: ServerConfiguration) -> Result<(), Error> {
84    use crate::endpoints::matrix::verify::verify_devices;
85
86    info!("Running Matrix device verification process");
87    verify_devices(server_config.endpoints()).await
88}
89
90#[cfg(not(feature = "matrix"))]
91/// Interactively verify devices for all Matrix endpoints in the provided [`ServerConfiguration`].
92pub async fn verify_matrix_devices(server_config: ServerConfiguration) -> Result<(), Error> {
93    info!("Running Matrix device verification process");
94    Err(Error::DisabledIEndpointFeature("matrix".to_string()))
95}