Skip to main content

room_daemon/
history.rs

1use std::{
2    io::Write,
3    path::{Path, PathBuf},
4};
5
6use room_protocol::Message;
7
8pub fn default_chat_path(room_id: &str) -> PathBuf {
9    PathBuf::from(format!("/tmp/{room_id}.chat"))
10}
11
12/// Read all messages from the NDJSON file, skipping malformed lines.
13pub async fn load(path: &Path) -> anyhow::Result<Vec<Message>> {
14    if !path.exists() {
15        return Ok(vec![]);
16    }
17    let path = path.to_owned();
18    let raw = tokio::task::spawn_blocking(move || std::fs::read_to_string(&path))
19        .await
20        .map_err(|e| anyhow::anyhow!("blocking file read cancelled: {e}"))??;
21
22    let mut messages = Vec::new();
23    for line in raw.lines() {
24        let trimmed = line.trim();
25        if trimmed.is_empty() {
26            continue;
27        }
28        match serde_json::from_str::<Message>(trimmed) {
29            Ok(msg) => messages.push(msg),
30            Err(e) => eprintln!("history: skipping malformed line: {e}"),
31        }
32    }
33    Ok(messages)
34}
35
36/// Return the last `n` messages from the NDJSON file, skipping malformed lines.
37///
38/// Returns all messages when the file has fewer than `n` entries.
39/// Returns an empty vec if the file does not exist.
40pub async fn tail(path: &Path, n: usize) -> anyhow::Result<Vec<Message>> {
41    let all = load(path).await?;
42    let start = all.len().saturating_sub(n);
43    Ok(all[start..].to_vec())
44}
45
46/// Return the highest `seq` value from persisted messages, or 0 if the file
47/// is empty or does not exist.
48///
49/// Used on broker startup to resume sequence numbering from the last persisted
50/// message, preventing seq resets across broker restarts (bug #721).
51pub fn max_seq_from_history(path: &Path) -> u64 {
52    let raw = match std::fs::read_to_string(path) {
53        Ok(s) => s,
54        Err(_) => return 0,
55    };
56    let mut max = 0u64;
57    for line in raw.lines() {
58        let trimmed = line.trim();
59        if trimmed.is_empty() {
60            continue;
61        }
62        if let Ok(msg) = serde_json::from_str::<Message>(trimmed) {
63            if let Some(seq) = msg.seq() {
64                if seq > max {
65                    max = seq;
66                }
67            }
68        }
69    }
70    max
71}
72
73/// Append a single message as a JSON line to the NDJSON file.
74///
75/// Uses `spawn_blocking` + `std::fs::OpenOptions` directly to avoid the
76/// `tokio::fs` abstraction layer which can fail with "background task failed"
77/// when the runtime's blocking thread pool is under pressure.
78pub async fn append(path: &Path, msg: &Message) -> anyhow::Result<()> {
79    let line = format!("{}\n", serde_json::to_string(msg)?);
80    let path = path.to_owned();
81    tokio::task::spawn_blocking(move || {
82        let mut file = std::fs::OpenOptions::new()
83            .create(true)
84            .append(true)
85            .open(&path)?;
86        file.write_all(line.as_bytes())?;
87        file.flush()
88    })
89    .await
90    .map_err(|e| anyhow::anyhow!("blocking file write cancelled: {e}"))??;
91    Ok(())
92}
93
94// ── Tests ─────────────────────────────────────────────────────────────────────
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use room_protocol::{make_join, make_leave, make_message};
100    use tempfile::NamedTempFile;
101
102    /// Write messages via `append`, read them back via `load`, assert equality.
103    #[tokio::test]
104    async fn append_then_load_round_trips_all_variants() {
105        let tmp = NamedTempFile::new().unwrap();
106        let path = tmp.path();
107
108        let msgs = vec![
109            make_join("r", "alice"),
110            make_message("r", "alice", "hello"),
111            make_leave("r", "alice"),
112        ];
113
114        for msg in &msgs {
115            append(path, msg).await.unwrap();
116        }
117
118        let loaded = load(path).await.unwrap();
119        assert_eq!(loaded.len(), msgs.len());
120        for (orig, loaded) in msgs.iter().zip(loaded.iter()) {
121            assert_eq!(orig, loaded);
122        }
123    }
124
125    #[tokio::test]
126    async fn load_nonexistent_returns_empty() {
127        let path = PathBuf::from("/tmp/__room_test_nonexistent_file_xyz.chat");
128        let result = load(&path).await.unwrap();
129        assert!(result.is_empty());
130    }
131
132    #[tokio::test]
133    async fn load_empty_file_returns_empty() {
134        let tmp = NamedTempFile::new().unwrap();
135        let result = load(tmp.path()).await.unwrap();
136        assert!(result.is_empty());
137    }
138
139    #[tokio::test]
140    async fn load_skips_malformed_lines_and_returns_valid_ones() {
141        let tmp = NamedTempFile::new().unwrap();
142        let path = tmp.path();
143
144        let good = make_message("r", "bob", "valid message");
145
146        // Write one good line, one garbage line, another good line
147        let raw = format!(
148            "{}\n{{not valid json}}\n{}\n",
149            serde_json::to_string(&good).unwrap(),
150            serde_json::to_string(&good).unwrap(),
151        );
152        tokio::fs::write(path, raw.as_bytes()).await.unwrap();
153
154        let loaded = load(path).await.unwrap();
155        assert_eq!(loaded.len(), 2, "malformed line should be silently skipped");
156        assert_eq!(loaded[0], good);
157        assert_eq!(loaded[1], good);
158    }
159
160    #[tokio::test]
161    async fn append_creates_file_if_not_present() {
162        let dir = tempfile::tempdir().unwrap();
163        let path = dir.path().join("new.chat");
164        assert!(!path.exists());
165
166        let msg = make_join("r", "alice");
167        append(&path, &msg).await.unwrap();
168
169        assert!(path.exists());
170        let loaded = load(&path).await.unwrap();
171        assert_eq!(loaded.len(), 1);
172    }
173
174    #[tokio::test]
175    async fn append_is_incremental_not_overwriting() {
176        let tmp = NamedTempFile::new().unwrap();
177        let path = tmp.path();
178
179        for i in 0..5 {
180            append(path, &make_message("r", "u", format!("msg {i}")))
181                .await
182                .unwrap();
183        }
184
185        let loaded = load(path).await.unwrap();
186        assert_eq!(loaded.len(), 5);
187    }
188
189    #[test]
190    fn max_seq_nonexistent_file_returns_zero() {
191        let path = PathBuf::from("/tmp/__room_test_no_such_file_seq.chat");
192        assert_eq!(max_seq_from_history(&path), 0);
193    }
194
195    #[test]
196    fn max_seq_empty_file_returns_zero() {
197        let tmp = NamedTempFile::new().unwrap();
198        assert_eq!(max_seq_from_history(tmp.path()), 0);
199    }
200
201    #[tokio::test]
202    async fn max_seq_returns_highest_seq() {
203        let tmp = NamedTempFile::new().unwrap();
204        let path = tmp.path();
205
206        // Write messages with seq values via broadcast simulation
207        let mut m1 = make_message("r", "alice", "first");
208        m1.set_seq(5);
209        let mut m2 = make_message("r", "bob", "second");
210        m2.set_seq(10);
211        let mut m3 = make_message("r", "carol", "third");
212        m3.set_seq(7);
213
214        append(path, &m1).await.unwrap();
215        append(path, &m2).await.unwrap();
216        append(path, &m3).await.unwrap();
217
218        assert_eq!(max_seq_from_history(path), 10);
219    }
220
221    #[tokio::test]
222    async fn max_seq_messages_without_seq_returns_zero() {
223        let tmp = NamedTempFile::new().unwrap();
224        let path = tmp.path();
225
226        // Messages without set_seq have seq=None
227        append(path, &make_message("r", "alice", "no seq"))
228            .await
229            .unwrap();
230        append(path, &make_join("r", "bob")).await.unwrap();
231
232        assert_eq!(max_seq_from_history(path), 0);
233    }
234
235    #[tokio::test]
236    async fn load_preserves_message_order() {
237        let tmp = NamedTempFile::new().unwrap();
238        let path = tmp.path();
239
240        let contents: Vec<&str> = vec!["first", "second", "third"];
241        for c in &contents {
242            append(path, &make_message("r", "u", *c)).await.unwrap();
243        }
244
245        let loaded = load(path).await.unwrap();
246        let loaded_contents: Vec<&str> = loaded
247            .iter()
248            .filter_map(|m| {
249                if let Message::Message { content, .. } = m {
250                    Some(content.as_str())
251                } else {
252                    None
253                }
254            })
255            .collect();
256        assert_eq!(loaded_contents, contents);
257    }
258}