ra_multiplex/
server.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use anyhow::{Context, Result};
4use tokio::task;
5use tracing::{error, info, info_span, warn, Instrument};
6
7use crate::client;
8use crate::config::Config;
9use crate::instance::InstanceMap;
10use crate::socketwrapper::Listener;
11
12pub async fn run(config: &Config) -> Result<()> {
13    let instance_map = InstanceMap::new(config).await;
14    let next_client_id = AtomicUsize::new(0);
15    let next_client_id = || next_client_id.fetch_add(1, Ordering::Relaxed);
16
17    let listener = Listener::bind(&config.listen).await.context("listen")?;
18    info!(socket = ?config.listen, "listening");
19    loop {
20        match listener.accept().await {
21            Ok((socket, _addr)) => {
22                let client_id = next_client_id();
23                let instance_map = instance_map.clone();
24
25                task::spawn(
26                    async move {
27                        info!("client connected");
28                        match client::process(socket, client_id, instance_map).await {
29                            Ok(_) => {}
30                            Err(err) => error!("client error: {err:?}"),
31                        }
32                    }
33                    .instrument(info_span!("client", %client_id)),
34                );
35            }
36            Err(err) => match err.kind() {
37                // ignore benign errors
38                std::io::ErrorKind::NotConnected => {
39                    warn!("listener error {err}");
40                }
41                _ => {
42                    Err(err).context("accept connection")?;
43                }
44            },
45        }
46    }
47}