chat_system/server.rs
1//! [`ChatServer`] and [`ChatListener`] traits for server-side implementations,
2//! plus the concrete protocol-agnostic [`Server`].
3//!
4//! A **server** is a named container of listeners. It owns no address, port, or
5//! protocol of its own — those belong to the listeners. Different listeners may
6//! speak entirely different protocols (e.g. IRC on one port, WebSocket on
7//! another) while still feeding messages into the same server event loop.
8//!
9//! A **listener** is a single (protocol, address, port) combination. It handles
10//! all wire-protocol details: accepting connections, parsing inbound data into
11//! [`Message`]s, calling the message handler, and formatting replies back in the
12//! appropriate wire format.
13
14use crate::message::Message;
15use anyhow::Result;
16use async_trait::async_trait;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20
21// ── MessageHandler ────────────────────────────────────────────────────────────
22
23/// A type-erased, cloneable, async message handler.
24///
25/// Listeners receive one of these from the server and call it for every inbound
26/// message. The return value `Option<String>` is an optional plain-text reply
27/// that the listener may format and send back in its wire protocol.
28pub type MessageHandler = Arc<
29 dyn Fn(Message) -> Pin<Box<dyn Future<Output = Result<Option<String>>> + Send>> + Send + Sync,
30>;
31
32/// Wrap a generic async closure into a [`MessageHandler`].
33///
34/// # Example
35///
36/// ```rust,ignore
37/// use chat_system::server::into_handler;
38///
39/// let h = into_handler(|msg| async move {
40/// println!("got: {}", msg.content);
41/// Ok(Some("thanks!".into()))
42/// });
43/// ```
44pub fn into_handler<F, Fut>(f: F) -> MessageHandler
45where
46 F: Fn(Message) -> Fut + Send + Sync + 'static,
47 Fut: Future<Output = Result<Option<String>>> + Send + 'static,
48{
49 Arc::new(move |msg| Box::pin(f(msg)))
50}
51
52// ── ChatListener ──────────────────────────────────────────────────────────────
53
54/// A single network endpoint: one (protocol, address, port) combination.
55///
56/// Listeners handle all wire-protocol details. When [`ChatListener::start`] is
57/// called the listener binds its address, accepts connections, parses inbound
58/// data into [`Message`]s, invokes the provided [`MessageHandler`], and sends
59/// any replies back in the appropriate wire format.
60#[async_trait]
61pub trait ChatListener: Send + Sync {
62 /// The address this listener is (or will be) bound to.
63 fn address(&self) -> &str;
64
65 /// The wire protocol this listener speaks (e.g. `"irc"`).
66 fn protocol(&self) -> &str;
67
68 /// Start accepting connections and processing messages.
69 ///
70 /// The `handler` is called for every inbound message; the optional `String`
71 /// return value is a reply that the listener formats into its wire protocol.
72 ///
73 /// The `alive` sender should be held (cloned) by every spawned task. When
74 /// all clones are dropped the server knows this listener has fully stopped.
75 async fn start(
76 &mut self,
77 handler: MessageHandler,
78 alive: tokio::sync::mpsc::Sender<()>,
79 ) -> Result<()>;
80
81 /// Stop accepting new connections and shut down all tasks.
82 async fn shutdown(&mut self) -> Result<()>;
83}
84
85// ── ChatServer ────────────────────────────────────────────────────────────────
86
87/// A protocol-agnostic chat server.
88///
89/// A server is defined by its *name* and the set of [`ChatListener`]s attached
90/// to it. It has no inherent address, port, or protocol — those are properties
91/// of the individual listeners.
92#[async_trait]
93pub trait ChatServer: Send + Sync {
94 /// Human-readable name of this server.
95 fn name(&self) -> &str;
96
97 /// Snapshot of all currently attached listeners (for introspection).
98 fn listeners(&self) -> Vec<&dyn ChatListener>;
99
100 /// Start all listeners and run the server event loop.
101 ///
102 /// Blocks until all listeners have exited (either through
103 /// [`ChatServer::shutdown`] or because they finished naturally).
104 async fn run<F, Fut>(&mut self, handler: F) -> Result<()>
105 where
106 F: Fn(Message) -> Fut + Send + Sync + 'static,
107 Fut: std::future::Future<Output = Result<Option<String>>> + Send + 'static;
108
109 /// Shut down all listeners, causing [`ChatServer::run`] to return.
110 async fn shutdown(&mut self) -> Result<()>;
111}
112
113// ── Server (concrete) ─────────────────────────────────────────────────────────
114
115/// The standard, protocol-agnostic [`ChatServer`] implementation.
116///
117/// Construct one with [`Server::new`], attach any number of listeners with
118/// [`Server::add_listener`], then call [`ChatServer::run`]:
119///
120/// ```rust,no_run
121/// use chat_system::server::Server;
122/// use chat_system::servers::IrcListener;
123/// use chat_system::ChatServer;
124///
125/// # #[tokio::main] async fn main() -> anyhow::Result<()> {
126/// let mut server = Server::new("my-server")
127/// .add_listener(IrcListener::new("0.0.0.0:6667"))
128/// .add_listener(IrcListener::new("0.0.0.0:6697"));
129///
130/// server.run(|msg| async move {
131/// println!("{}: {}", msg.sender, msg.content);
132/// Ok(Some(format!("echo: {}", msg.content)))
133/// }).await?;
134/// # Ok(()) }
135/// ```
136///
137/// With the `tls` feature enabled, you can mix plaintext and TLS listeners:
138///
139/// ```rust,ignore
140/// use chat_system::servers::{IrcListener, TlsIrcListener};
141///
142/// let server = Server::new("my-server")
143/// .add_listener(IrcListener::new("0.0.0.0:6667"))
144/// .add_listener(TlsIrcListener::new("0.0.0.0:6697", tls_config));
145/// ```
146pub struct Server {
147 name: String,
148 listeners: Vec<Box<dyn ChatListener>>,
149}
150
151impl Server {
152 /// Create a new empty server with the given name.
153 pub fn new(name: impl Into<String>) -> Self {
154 Self {
155 name: name.into(),
156 listeners: Vec::new(),
157 }
158 }
159
160 /// Attach a listener. The listener may speak any protocol.
161 ///
162 /// Listeners must be added **before** calling [`ChatServer::run`].
163 pub fn add_listener(mut self, listener: impl ChatListener + 'static) -> Self {
164 self.listeners.push(Box::new(listener));
165 self
166 }
167
168 /// Attach an already-boxed listener.
169 ///
170 /// This is useful when you already have a `Box<dyn ChatListener>`, for
171 /// example from a [`ListenerConfig::build`](crate::config::ListenerConfig::build) call.
172 /// Listeners must be added **before** calling [`ChatServer::run`].
173 pub fn add_boxed_listener(mut self, listener: Box<dyn ChatListener>) -> Self {
174 self.listeners.push(listener);
175 self
176 }
177}
178
179#[async_trait]
180impl ChatServer for Server {
181 fn name(&self) -> &str {
182 &self.name
183 }
184
185 fn listeners(&self) -> Vec<&dyn ChatListener> {
186 self.listeners.iter().map(|l| l.as_ref()).collect()
187 }
188
189 async fn run<F, Fut>(&mut self, handler: F) -> Result<()>
190 where
191 F: Fn(Message) -> Fut + Send + Sync + 'static,
192 Fut: std::future::Future<Output = Result<Option<String>>> + Send + 'static,
193 {
194 let handler: MessageHandler = into_handler(handler);
195
196 // Each listener holds a clone of `alive_tx`. When every clone is
197 // dropped (all listener tasks have exited) the `alive_rx.recv()` below
198 // returns `None` and `run()` completes.
199 let (alive_tx, mut alive_rx) = tokio::sync::mpsc::channel::<()>(1);
200
201 for listener in &mut self.listeners {
202 listener.start(handler.clone(), alive_tx.clone()).await?;
203 }
204
205 // Drop our own clone so the channel closes when all listeners stop.
206 drop(alive_tx);
207
208 // Block until all listeners have exited.
209 let _ = alive_rx.recv().await;
210
211 Ok(())
212 }
213
214 async fn shutdown(&mut self) -> Result<()> {
215 for listener in &mut self.listeners {
216 listener.shutdown().await?;
217 }
218 Ok(())
219 }
220}