1use std::{
11 path::{Path, PathBuf},
12 sync::{
13 atomic::{AtomicU64, Ordering},
14 Arc,
15 },
16};
17
18use room_protocol::plugin::{
19 BoxFuture, HistoryAccess, MessageWriter, RoomMetadata, TeamAccess, UserInfo,
20};
21
22use room_protocol::{make_event, make_system, Message};
23
24use crate::{
25 broker::{
26 fanout::broadcast_and_persist,
27 state::{ClientMap, StatusMap},
28 },
29 history,
30 registry::UserRegistry,
31};
32
33pub struct HistoryReader {
43 chat_path: PathBuf,
44 viewer: String,
45}
46
47impl HistoryReader {
48 pub(crate) fn new(chat_path: &Path, viewer: &str) -> Self {
49 Self {
50 chat_path: chat_path.to_owned(),
51 viewer: viewer.to_owned(),
52 }
53 }
54
55 fn filter_dms(&self, messages: Vec<Message>) -> Vec<Message> {
56 messages
57 .into_iter()
58 .filter(|m| match m {
59 Message::DirectMessage { user, to, .. } => {
60 user == &self.viewer || to == &self.viewer
61 }
62 _ => true,
63 })
64 .collect()
65 }
66}
67
68impl HistoryAccess for HistoryReader {
69 fn all(&self) -> BoxFuture<'_, anyhow::Result<Vec<Message>>> {
70 Box::pin(async {
71 let all = history::load(&self.chat_path).await?;
72 Ok(self.filter_dms(all))
73 })
74 }
75
76 fn tail(&self, n: usize) -> BoxFuture<'_, anyhow::Result<Vec<Message>>> {
77 Box::pin(async move {
78 let all = history::tail(&self.chat_path, n).await?;
79 Ok(self.filter_dms(all))
80 })
81 }
82
83 fn since(&self, message_id: &str) -> BoxFuture<'_, anyhow::Result<Vec<Message>>> {
84 let message_id = message_id.to_owned();
85 Box::pin(async move {
86 let all = history::load(&self.chat_path).await?;
87 let start = all
88 .iter()
89 .position(|m| m.id() == message_id)
90 .map(|i| i + 1)
91 .unwrap_or(0);
92 Ok(self.filter_dms(all[start..].to_vec()))
93 })
94 }
95
96 fn count(&self) -> BoxFuture<'_, anyhow::Result<usize>> {
97 Box::pin(async {
98 let all = history::load(&self.chat_path).await?;
99 Ok(all.len())
100 })
101 }
102}
103
104pub struct ChatWriter {
114 clients: ClientMap,
115 chat_path: Arc<PathBuf>,
116 room_id: Arc<String>,
117 seq_counter: Arc<AtomicU64>,
118 identity: String,
120}
121
122impl ChatWriter {
123 pub(crate) fn new(
124 clients: &ClientMap,
125 chat_path: &Arc<PathBuf>,
126 room_id: &Arc<String>,
127 seq_counter: &Arc<AtomicU64>,
128 plugin_name: &str,
129 ) -> Self {
130 Self {
131 clients: clients.clone(),
132 chat_path: chat_path.clone(),
133 room_id: room_id.clone(),
134 seq_counter: seq_counter.clone(),
135 identity: format!("plugin:{plugin_name}"),
136 }
137 }
138}
139
140impl MessageWriter for ChatWriter {
141 fn broadcast(&self, content: &str) -> BoxFuture<'_, anyhow::Result<()>> {
142 let msg = make_system(&self.room_id, &self.identity, content);
143 Box::pin(async move {
144 broadcast_and_persist(&msg, &self.clients, &self.chat_path, &self.seq_counter).await?;
145 Ok(())
146 })
147 }
148
149 fn reply_to(&self, username: &str, content: &str) -> BoxFuture<'_, anyhow::Result<()>> {
150 let msg = make_system(&self.room_id, &self.identity, content);
151 let username = username.to_owned();
152 Box::pin(async move {
153 let seq = self.seq_counter.fetch_add(1, Ordering::SeqCst) + 1;
154 let mut msg = msg;
155 msg.set_seq(seq);
156 history::append(&self.chat_path, &msg).await?;
157
158 let line = format!("{}\n", serde_json::to_string(&msg)?);
159 let map = self.clients.lock().await;
160 for (uname, tx) in map.values() {
161 if *uname == username {
162 let _ = tx.send(line.clone());
163 }
164 }
165 Ok(())
166 })
167 }
168
169 fn emit_event(
170 &self,
171 event_type: room_protocol::EventType,
172 content: &str,
173 params: Option<serde_json::Value>,
174 ) -> BoxFuture<'_, anyhow::Result<()>> {
175 let msg = make_event(&self.room_id, &self.identity, event_type, content, params);
176 Box::pin(async move {
177 broadcast_and_persist(&msg, &self.clients, &self.chat_path, &self.seq_counter).await?;
178 Ok(())
179 })
180 }
181}
182
183pub(crate) async fn snapshot_metadata(
187 status_map: &StatusMap,
188 host_user: &Arc<tokio::sync::Mutex<Option<String>>>,
189 chat_path: &Path,
190) -> RoomMetadata {
191 let map = status_map.lock().await;
192 let online_users: Vec<UserInfo> = map
193 .iter()
194 .map(|(u, s)| UserInfo {
195 username: u.clone(),
196 status: s.clone(),
197 })
198 .collect();
199 drop(map);
200
201 let host = host_user.lock().await.clone();
202
203 let message_count = history::load(chat_path)
204 .await
205 .map(|msgs| msgs.len())
206 .unwrap_or(0);
207
208 RoomMetadata {
209 online_users,
210 host,
211 message_count,
212 }
213}
214
215pub struct TeamChecker {
224 registry: Arc<tokio::sync::Mutex<UserRegistry>>,
225}
226
227impl TeamChecker {
228 pub(crate) fn new(registry: &Arc<tokio::sync::Mutex<UserRegistry>>) -> Self {
229 Self {
230 registry: registry.clone(),
231 }
232 }
233}
234
235impl TeamAccess for TeamChecker {
236 fn team_exists(&self, team: &str) -> bool {
237 match self.registry.try_lock() {
238 Ok(reg) => reg.get_team(team).is_some(),
239 Err(_) => false,
240 }
241 }
242
243 fn is_member(&self, team: &str, user: &str) -> bool {
244 match self.registry.try_lock() {
245 Ok(reg) => reg
246 .get_team(team)
247 .map(|t| t.members.contains(user))
248 .unwrap_or(false),
249 Err(_) => false,
250 }
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[tokio::test]
259 async fn history_reader_filters_dms() {
260 let tmp = tempfile::NamedTempFile::new().unwrap();
261 let path = tmp.path();
262
263 let dm = room_protocol::make_dm("r", "alice", "bob", "secret");
264 let public = room_protocol::make_message("r", "carol", "hello all");
265 history::append(path, &dm).await.unwrap();
266 history::append(path, &public).await.unwrap();
267
268 let reader_alice = HistoryReader::new(path, "alice");
269 let msgs = reader_alice.all().await.unwrap();
270 assert_eq!(msgs.len(), 2);
271
272 let reader_carol = HistoryReader::new(path, "carol");
273 let msgs = reader_carol.all().await.unwrap();
274 assert_eq!(msgs.len(), 1);
275 assert_eq!(msgs[0].user(), "carol");
276 }
277
278 #[tokio::test]
279 async fn history_reader_tail_and_count() {
280 let tmp = tempfile::NamedTempFile::new().unwrap();
281 let path = tmp.path();
282
283 for i in 0..5 {
284 history::append(
285 path,
286 &room_protocol::make_message("r", "u", format!("msg {i}")),
287 )
288 .await
289 .unwrap();
290 }
291
292 let reader = HistoryReader::new(path, "u");
293 assert_eq!(reader.count().await.unwrap(), 5);
294
295 let tail = reader.tail(3).await.unwrap();
296 assert_eq!(tail.len(), 3);
297 }
298
299 #[tokio::test]
300 async fn history_reader_since() {
301 let tmp = tempfile::NamedTempFile::new().unwrap();
302 let path = tmp.path();
303
304 let msg1 = room_protocol::make_message("r", "u", "first");
305 let msg2 = room_protocol::make_message("r", "u", "second");
306 let msg3 = room_protocol::make_message("r", "u", "third");
307 let id1 = msg1.id().to_owned();
308 history::append(path, &msg1).await.unwrap();
309 history::append(path, &msg2).await.unwrap();
310 history::append(path, &msg3).await.unwrap();
311
312 let reader = HistoryReader::new(path, "u");
313 let since = reader.since(&id1).await.unwrap();
314 assert_eq!(since.len(), 2);
315 }
316}