Skip to main content

chat_system/
server.rs

1//! [`ChatServer`] and [`ChatListener`] traits for server-side implementations,
2//! plus the concrete protocol-agnostic [`Server`].
3//!
4//! A **server** is a named container of listeners.  It owns no address, port, or
5//! protocol of its own — those belong to the listeners.  Different listeners may
6//! speak entirely different protocols (e.g. IRC on one port, WebSocket on
7//! another) while still feeding messages into the same server event loop.
8//!
9//! A **listener** is a single (protocol, address, port) combination.  It handles
10//! all wire-protocol details: accepting connections, parsing inbound data into
11//! [`Message`]s, calling the message handler, and formatting replies back in the
12//! appropriate wire format.
13
14use crate::message::Message;
15use anyhow::Result;
16use async_trait::async_trait;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20
21// ── MessageHandler ────────────────────────────────────────────────────────────
22
23/// A type-erased, cloneable, async message handler.
24///
25/// Listeners receive one of these from the server and call it for every inbound
26/// message.  The return value `Option<String>` is an optional plain-text reply
27/// that the listener may format and send back in its wire protocol.
28pub type MessageHandler = Arc<
29    dyn Fn(Message) -> Pin<Box<dyn Future<Output = Result<Option<String>>> + Send>> + Send + Sync,
30>;
31
32/// Wrap a generic async closure into a [`MessageHandler`].
33///
34/// # Example
35///
36/// ```rust,ignore
37/// use chat_system::server::into_handler;
38///
39/// let h = into_handler(|msg| async move {
40///     println!("got: {}", msg.content);
41///     Ok(Some("thanks!".into()))
42/// });
43/// ```
44pub fn into_handler<F, Fut>(f: F) -> MessageHandler
45where
46    F: Fn(Message) -> Fut + Send + Sync + 'static,
47    Fut: Future<Output = Result<Option<String>>> + Send + 'static,
48{
49    Arc::new(move |msg| Box::pin(f(msg)))
50}
51
52// ── ChatListener ──────────────────────────────────────────────────────────────
53
54/// A single network endpoint: one (protocol, address, port) combination.
55///
56/// Listeners handle all wire-protocol details.  When [`ChatListener::start`] is
57/// called the listener binds its address, accepts connections, parses inbound
58/// data into [`Message`]s, invokes the provided [`MessageHandler`], and sends
59/// any replies back in the appropriate wire format.
60#[async_trait]
61pub trait ChatListener: Send + Sync {
62    /// The address this listener is (or will be) bound to.
63    fn address(&self) -> &str;
64
65    /// The wire protocol this listener speaks (e.g. `"irc"`).
66    fn protocol(&self) -> &str;
67
68    /// Start accepting connections and processing messages.
69    ///
70    /// The `handler` is called for every inbound message; the optional `String`
71    /// return value is a reply that the listener formats into its wire protocol.
72    ///
73    /// The `alive` sender should be held (cloned) by every spawned task.  When
74    /// all clones are dropped the server knows this listener has fully stopped.
75    async fn start(
76        &mut self,
77        handler: MessageHandler,
78        alive: tokio::sync::mpsc::Sender<()>,
79    ) -> Result<()>;
80
81    /// Stop accepting new connections and shut down all tasks.
82    async fn shutdown(&mut self) -> Result<()>;
83}
84
85// ── ChatServer ────────────────────────────────────────────────────────────────
86
87/// A protocol-agnostic chat server.
88///
89/// A server is defined by its *name* and the set of [`ChatListener`]s attached
90/// to it.  It has no inherent address, port, or protocol — those are properties
91/// of the individual listeners.
92#[async_trait]
93pub trait ChatServer: Send + Sync {
94    /// Human-readable name of this server.
95    fn name(&self) -> &str;
96
97    /// Snapshot of all currently attached listeners (for introspection).
98    fn listeners(&self) -> Vec<&dyn ChatListener>;
99
100    /// Start all listeners and run the server event loop.
101    ///
102    /// Blocks until all listeners have exited (either through
103    /// [`ChatServer::shutdown`] or because they finished naturally).
104    async fn run<F, Fut>(&mut self, handler: F) -> Result<()>
105    where
106        F: Fn(Message) -> Fut + Send + Sync + 'static,
107        Fut: std::future::Future<Output = Result<Option<String>>> + Send + 'static;
108
109    /// Shut down all listeners, causing [`ChatServer::run`] to return.
110    async fn shutdown(&mut self) -> Result<()>;
111}
112
113// ── Server (concrete) ─────────────────────────────────────────────────────────
114
115/// The standard, protocol-agnostic [`ChatServer`] implementation.
116///
117/// Construct one with [`Server::new`], attach any number of listeners with
118/// [`Server::add_listener`], then call [`ChatServer::run`]:
119///
120/// ```rust,no_run
121/// use chat_system::server::Server;
122/// use chat_system::servers::IrcListener;
123/// use chat_system::ChatServer;
124///
125/// # #[tokio::main] async fn main() -> anyhow::Result<()> {
126/// let mut server = Server::new("my-server")
127///     .add_listener(IrcListener::new("0.0.0.0:6667"))
128///     .add_listener(IrcListener::new("0.0.0.0:6697"));
129///
130/// server.run(|msg| async move {
131///     println!("{}: {}", msg.sender, msg.content);
132///     Ok(Some(format!("echo: {}", msg.content)))
133/// }).await?;
134/// # Ok(()) }
135/// ```
136///
137/// With the `tls` feature enabled, you can mix plaintext and TLS listeners:
138///
139/// ```rust,ignore
140/// use chat_system::servers::{IrcListener, TlsIrcListener};
141///
142/// let server = Server::new("my-server")
143///     .add_listener(IrcListener::new("0.0.0.0:6667"))
144///     .add_listener(TlsIrcListener::new("0.0.0.0:6697", tls_config));
145/// ```
146pub struct Server {
147    name: String,
148    listeners: Vec<Box<dyn ChatListener>>,
149}
150
151impl Server {
152    /// Create a new empty server with the given name.
153    pub fn new(name: impl Into<String>) -> Self {
154        Self {
155            name: name.into(),
156            listeners: Vec::new(),
157        }
158    }
159
160    /// Attach a listener.  The listener may speak any protocol.
161    ///
162    /// Listeners must be added **before** calling [`ChatServer::run`].
163    pub fn add_listener(mut self, listener: impl ChatListener + 'static) -> Self {
164        self.listeners.push(Box::new(listener));
165        self
166    }
167
168    /// Attach an already-boxed listener.
169    ///
170    /// This is useful when you already have a `Box<dyn ChatListener>`, for
171    /// example from a [`ListenerConfig::build`](crate::config::ListenerConfig::build) call.
172    /// Listeners must be added **before** calling [`ChatServer::run`].
173    pub fn add_boxed_listener(mut self, listener: Box<dyn ChatListener>) -> Self {
174        self.listeners.push(listener);
175        self
176    }
177}
178
179#[async_trait]
180impl ChatServer for Server {
181    fn name(&self) -> &str {
182        &self.name
183    }
184
185    fn listeners(&self) -> Vec<&dyn ChatListener> {
186        self.listeners.iter().map(|l| l.as_ref()).collect()
187    }
188
189    async fn run<F, Fut>(&mut self, handler: F) -> Result<()>
190    where
191        F: Fn(Message) -> Fut + Send + Sync + 'static,
192        Fut: std::future::Future<Output = Result<Option<String>>> + Send + 'static,
193    {
194        let handler: MessageHandler = into_handler(handler);
195
196        // Each listener holds a clone of `alive_tx`.  When every clone is
197        // dropped (all listener tasks have exited) the `alive_rx.recv()` below
198        // returns `None` and `run()` completes.
199        let (alive_tx, mut alive_rx) = tokio::sync::mpsc::channel::<()>(1);
200
201        for listener in &mut self.listeners {
202            listener.start(handler.clone(), alive_tx.clone()).await?;
203        }
204
205        // Drop our own clone so the channel closes when all listeners stop.
206        drop(alive_tx);
207
208        // Block until all listeners have exited.
209        let _ = alive_rx.recv().await;
210
211        Ok(())
212    }
213
214    async fn shutdown(&mut self) -> Result<()> {
215        for listener in &mut self.listeners {
216            listener.shutdown().await?;
217        }
218        Ok(())
219    }
220}