Skip to main content

room_daemon/plugin/
bridge.rs

1//! Bridge between the abstract plugin framework (room-protocol) and the
2//! concrete broker internals (room-cli). This module provides the concrete
3//! [`ChatWriter`] and [`HistoryReader`] types that implement the
4//! [`MessageWriter`] and [`HistoryAccess`] traits respectively.
5//!
6//! **This is the only plugin submodule that imports from `crate::broker`.**
7//! Plugin authors never use these types directly — they receive trait objects
8//! via [`CommandContext`].
9
10use std::{
11    path::{Path, PathBuf},
12    sync::{
13        atomic::{AtomicU64, Ordering},
14        Arc,
15    },
16};
17
18use room_protocol::plugin::{
19    BoxFuture, HistoryAccess, MessageWriter, RoomMetadata, TeamAccess, UserInfo,
20};
21
22use room_protocol::{make_event, make_system, Message};
23
24use crate::{
25    broker::{
26        fanout::broadcast_and_persist,
27        state::{ClientMap, StatusMap},
28    },
29    history,
30    registry::UserRegistry,
31};
32
33// ── HistoryReader ───────────────────────────────────────────────────────────
34
35/// Scoped read-only handle to a room's chat history.
36///
37/// Respects DM visibility — a plugin invoked by user X will not see DMs
38/// between Y and Z.
39///
40/// Implements [`HistoryAccess`] so it can be passed as
41/// `Box<dyn HistoryAccess>` in [`CommandContext`].
42pub struct HistoryReader {
43    chat_path: PathBuf,
44    viewer: String,
45}
46
47impl HistoryReader {
48    pub(crate) fn new(chat_path: &Path, viewer: &str) -> Self {
49        Self {
50            chat_path: chat_path.to_owned(),
51            viewer: viewer.to_owned(),
52        }
53    }
54
55    fn filter_dms(&self, messages: Vec<Message>) -> Vec<Message> {
56        messages
57            .into_iter()
58            .filter(|m| match m {
59                Message::DirectMessage { user, to, .. } => {
60                    user == &self.viewer || to == &self.viewer
61                }
62                _ => true,
63            })
64            .collect()
65    }
66}
67
68impl HistoryAccess for HistoryReader {
69    fn all(&self) -> BoxFuture<'_, anyhow::Result<Vec<Message>>> {
70        Box::pin(async {
71            let all = history::load(&self.chat_path).await?;
72            Ok(self.filter_dms(all))
73        })
74    }
75
76    fn tail(&self, n: usize) -> BoxFuture<'_, anyhow::Result<Vec<Message>>> {
77        Box::pin(async move {
78            let all = history::tail(&self.chat_path, n).await?;
79            Ok(self.filter_dms(all))
80        })
81    }
82
83    fn since(&self, message_id: &str) -> BoxFuture<'_, anyhow::Result<Vec<Message>>> {
84        let message_id = message_id.to_owned();
85        Box::pin(async move {
86            let all = history::load(&self.chat_path).await?;
87            let start = all
88                .iter()
89                .position(|m| m.id() == message_id)
90                .map(|i| i + 1)
91                .unwrap_or(0);
92            Ok(self.filter_dms(all[start..].to_vec()))
93        })
94    }
95
96    fn count(&self) -> BoxFuture<'_, anyhow::Result<usize>> {
97        Box::pin(async {
98            let all = history::load(&self.chat_path).await?;
99            Ok(all.len())
100        })
101    }
102}
103
104// ── ChatWriter ──────────────────────────────────────────────────────────────
105
106/// Short-lived scoped handle for a plugin to write messages to the chat.
107///
108/// Posts as `plugin:<name>` — plugins cannot impersonate users. The writer
109/// is valid only for the duration of [`Plugin::handle`].
110///
111/// Implements [`MessageWriter`] so it can be passed as
112/// `Box<dyn MessageWriter>` in [`CommandContext`].
113pub struct ChatWriter {
114    clients: ClientMap,
115    chat_path: Arc<PathBuf>,
116    room_id: Arc<String>,
117    seq_counter: Arc<AtomicU64>,
118    /// Identity the writer posts as (e.g. `"plugin:stats"`).
119    identity: String,
120}
121
122impl ChatWriter {
123    pub(crate) fn new(
124        clients: &ClientMap,
125        chat_path: &Arc<PathBuf>,
126        room_id: &Arc<String>,
127        seq_counter: &Arc<AtomicU64>,
128        plugin_name: &str,
129    ) -> Self {
130        Self {
131            clients: clients.clone(),
132            chat_path: chat_path.clone(),
133            room_id: room_id.clone(),
134            seq_counter: seq_counter.clone(),
135            identity: format!("plugin:{plugin_name}"),
136        }
137    }
138}
139
140impl MessageWriter for ChatWriter {
141    fn broadcast(&self, content: &str) -> BoxFuture<'_, anyhow::Result<()>> {
142        let msg = make_system(&self.room_id, &self.identity, content);
143        Box::pin(async move {
144            broadcast_and_persist(&msg, &self.clients, &self.chat_path, &self.seq_counter).await?;
145            Ok(())
146        })
147    }
148
149    fn reply_to(&self, username: &str, content: &str) -> BoxFuture<'_, anyhow::Result<()>> {
150        let msg = make_system(&self.room_id, &self.identity, content);
151        let username = username.to_owned();
152        Box::pin(async move {
153            let seq = self.seq_counter.fetch_add(1, Ordering::SeqCst) + 1;
154            let mut msg = msg;
155            msg.set_seq(seq);
156            history::append(&self.chat_path, &msg).await?;
157
158            let line = format!("{}\n", serde_json::to_string(&msg)?);
159            let map = self.clients.lock().await;
160            for (uname, tx) in map.values() {
161                if *uname == username {
162                    let _ = tx.send(line.clone());
163                }
164            }
165            Ok(())
166        })
167    }
168
169    fn emit_event(
170        &self,
171        event_type: room_protocol::EventType,
172        content: &str,
173        params: Option<serde_json::Value>,
174    ) -> BoxFuture<'_, anyhow::Result<()>> {
175        let msg = make_event(&self.room_id, &self.identity, event_type, content, params);
176        Box::pin(async move {
177            broadcast_and_persist(&msg, &self.clients, &self.chat_path, &self.seq_counter).await?;
178            Ok(())
179        })
180    }
181}
182
183// ── RoomMetadata factory ────────────────────────────────────────────────────
184
185/// Build a [`RoomMetadata`] snapshot from live broker state.
186pub(crate) async fn snapshot_metadata(
187    status_map: &StatusMap,
188    host_user: &Arc<tokio::sync::Mutex<Option<String>>>,
189    chat_path: &Path,
190) -> RoomMetadata {
191    let map = status_map.lock().await;
192    let online_users: Vec<UserInfo> = map
193        .iter()
194        .map(|(u, s)| UserInfo {
195            username: u.clone(),
196            status: s.clone(),
197        })
198        .collect();
199    drop(map);
200
201    let host = host_user.lock().await.clone();
202
203    let message_count = history::load(chat_path)
204        .await
205        .map(|msgs| msgs.len())
206        .unwrap_or(0);
207
208    RoomMetadata {
209        online_users,
210        host,
211        message_count,
212    }
213}
214
215// ── TeamChecker ─────────────────────────────────────────────────────────────
216
217/// Concrete [`TeamAccess`] backed by the daemon's [`UserRegistry`].
218///
219/// Wraps the shared registry behind `Arc<tokio::sync::Mutex<_>>`. Each method
220/// uses `try_lock()` for non-blocking access. If the lock is contended (rare —
221/// the registry lock is held only briefly by command handlers), the method
222/// returns a conservative `false`.
223pub struct TeamChecker {
224    registry: Arc<tokio::sync::Mutex<UserRegistry>>,
225}
226
227impl TeamChecker {
228    pub(crate) fn new(registry: &Arc<tokio::sync::Mutex<UserRegistry>>) -> Self {
229        Self {
230            registry: registry.clone(),
231        }
232    }
233}
234
235impl TeamAccess for TeamChecker {
236    fn team_exists(&self, team: &str) -> bool {
237        match self.registry.try_lock() {
238            Ok(reg) => reg.get_team(team).is_some(),
239            Err(_) => false,
240        }
241    }
242
243    fn is_member(&self, team: &str, user: &str) -> bool {
244        match self.registry.try_lock() {
245            Ok(reg) => reg
246                .get_team(team)
247                .map(|t| t.members.contains(user))
248                .unwrap_or(false),
249            Err(_) => false,
250        }
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[tokio::test]
259    async fn history_reader_filters_dms() {
260        let tmp = tempfile::NamedTempFile::new().unwrap();
261        let path = tmp.path();
262
263        let dm = room_protocol::make_dm("r", "alice", "bob", "secret");
264        let public = room_protocol::make_message("r", "carol", "hello all");
265        history::append(path, &dm).await.unwrap();
266        history::append(path, &public).await.unwrap();
267
268        let reader_alice = HistoryReader::new(path, "alice");
269        let msgs = reader_alice.all().await.unwrap();
270        assert_eq!(msgs.len(), 2);
271
272        let reader_carol = HistoryReader::new(path, "carol");
273        let msgs = reader_carol.all().await.unwrap();
274        assert_eq!(msgs.len(), 1);
275        assert_eq!(msgs[0].user(), "carol");
276    }
277
278    #[tokio::test]
279    async fn history_reader_tail_and_count() {
280        let tmp = tempfile::NamedTempFile::new().unwrap();
281        let path = tmp.path();
282
283        for i in 0..5 {
284            history::append(
285                path,
286                &room_protocol::make_message("r", "u", format!("msg {i}")),
287            )
288            .await
289            .unwrap();
290        }
291
292        let reader = HistoryReader::new(path, "u");
293        assert_eq!(reader.count().await.unwrap(), 5);
294
295        let tail = reader.tail(3).await.unwrap();
296        assert_eq!(tail.len(), 3);
297    }
298
299    #[tokio::test]
300    async fn history_reader_since() {
301        let tmp = tempfile::NamedTempFile::new().unwrap();
302        let path = tmp.path();
303
304        let msg1 = room_protocol::make_message("r", "u", "first");
305        let msg2 = room_protocol::make_message("r", "u", "second");
306        let msg3 = room_protocol::make_message("r", "u", "third");
307        let id1 = msg1.id().to_owned();
308        history::append(path, &msg1).await.unwrap();
309        history::append(path, &msg2).await.unwrap();
310        history::append(path, &msg3).await.unwrap();
311
312        let reader = HistoryReader::new(path, "u");
313        let since = reader.since(&id1).await.unwrap();
314        assert_eq!(since.len(), 2);
315    }
316}