1use crate::configuration::ServerConfiguration;
2use crate::endpoints::{setup_endpoints, EndpointChannel};
3use crate::interfaces::setup_server_interfaces;
4use crate::notifications::{Notification, ValidatedNotification};
5use crate::shutdown::listen_for_shutdown;
6use crate::{Error, CHANNEL_BUFFER};
7use tracing::{debug, info, warn};
8use tokio::sync::{mpsc, watch};
9
10const DEFAULT_WAIT_FOR_SHUTDOWN_SECS: u64 = 2;
11
12pub async fn start_server(
18 server_config: ServerConfiguration,
19 shutdown: Option<watch::Receiver<bool>>,
20 wait_for_shutdown_secs: Option<u64>,
21) -> Result<(), Error> {
22 let (shutdown_tx, shutdown_rx) = watch::channel(false);
24 let (interface_tx, interface_rx) = mpsc::channel(CHANNEL_BUFFER);
25
26 let interfaces = server_config.interfaces();
28 setup_server_interfaces(interfaces, interface_tx.clone(), shutdown_rx.clone()).await?;
29
30 let endpoints = server_config.endpoint_channels();
32 setup_endpoints(endpoints.clone(), shutdown_rx.clone()).await?;
33
34 tokio::spawn(async move {
36 process_incoming_notifications(interface_rx, endpoints).await;
37 });
38
39 let shutdown_secs = wait_for_shutdown_secs.unwrap_or(DEFAULT_WAIT_FOR_SHUTDOWN_SECS);
41 info!("Listening for shutdown signals");
42 listen_for_shutdown(shutdown_tx, shutdown, shutdown_secs).await;
43
44 Ok(())
45}
46
47async fn process_incoming_notifications(mut msg_rx: mpsc::Receiver<String>, endpoints: Vec<EndpointChannel>) {
48 info!("Processing Notifications");
49
50 while let Some(msg) = msg_rx.recv().await {
51 let notifications = Notification::from_json_multi(msg.as_str());
52
53 for notification in notifications {
54 match notification {
55 Ok(note) => {
56 debug!("Notification received: {:?}", note);
57 for endpoint in &endpoints {
58 for (sub_name, keys) in endpoint.keys() {
59 if note.validate_set(keys) {
60 let channel = endpoint.channel_sender();
61 match channel.send(ValidatedNotification::new(sub_name, note.message())) {
62 Ok(ok) => {
63 debug!("Message sent to endpoint. Subscribers: {}", ok)
64 }
65 Err(e) => warn!(
66
67 "Error sending validated message to endpoint: {}", e
68 ),
69 };
70 }
71 }
72 }
73 }
74
75 Err(e) => warn!("Notification processing error: {}", e),
76 }
77 }
78 }
79}
80
81#[cfg(feature = "matrix")]
82pub async fn verify_matrix_devices(server_config: ServerConfiguration) -> Result<(), Error> {
84 use crate::endpoints::matrix::verify::verify_devices;
85
86 info!("Running Matrix device verification process");
87 verify_devices(server_config.endpoints()).await
88}
89
90#[cfg(not(feature = "matrix"))]
91pub async fn verify_matrix_devices(server_config: ServerConfiguration) -> Result<(), Error> {
93 info!("Running Matrix device verification process");
94 Err(Error::DisabledIEndpointFeature("matrix".to_string()))
95}