1use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use tokio::fs::{self, OpenOptions};
7use tokio::io::{AsyncSeekExt, AsyncWriteExt};
8use tokio::sync::Mutex;
9
10use crate::MemoryResult;
11use crate::record::MemoryRecord;
12
13#[async_trait]
15pub trait Journal: Send + Sync {
16 async fn append(&self, record: &MemoryRecord) -> MemoryResult<()>;
18
19 async fn tail(&self, limit: usize) -> MemoryResult<Vec<MemoryRecord>>;
21
22 async fn clear(&self) -> MemoryResult<()>;
24}
25
26pub struct FileJournal {
28 path: PathBuf,
29 file: Mutex<tokio::fs::File>,
30}
31
32impl FileJournal {
33 pub async fn open(path: impl Into<PathBuf>) -> MemoryResult<Self> {
40 let path = path.into();
41 if let Some(parent) = path.parent() {
42 fs::create_dir_all(parent).await?;
43 }
44
45 let file = OpenOptions::new()
46 .create(true)
47 .append(true)
48 .read(true)
49 .open(&path)
50 .await?;
51
52 Ok(Self {
53 path,
54 file: Mutex::new(file),
55 })
56 }
57
58 #[must_use]
60 pub fn path(&self) -> &Path {
61 &self.path
62 }
63}
64
65#[async_trait]
66impl Journal for FileJournal {
67 async fn append(&self, record: &MemoryRecord) -> MemoryResult<()> {
68 let line = serde_json::to_vec(record)?;
69 let mut guard = self.file.lock().await;
70 guard.write_all(&line).await?;
71 guard.write_u8(b'\n').await?;
72 guard.flush().await?;
73 Ok(())
74 }
75
76 async fn tail(&self, limit: usize) -> MemoryResult<Vec<MemoryRecord>> {
77 if limit == 0 {
78 return Ok(Vec::new());
79 }
80
81 let data = fs::read(&self.path).await?;
82 if data.is_empty() {
83 return Ok(Vec::new());
84 }
85
86 let mut records = Vec::new();
87 for chunk in data
88 .split(|byte| *byte == b'\n')
89 .filter(|chunk| !chunk.is_empty())
90 {
91 let record: MemoryRecord = serde_json::from_slice(chunk)?;
92 records.push(record);
93 }
94
95 if records.len() <= limit {
96 return Ok(records);
97 }
98
99 let skip = records.len() - limit;
100 Ok(records.into_iter().skip(skip).collect())
101 }
102
103 async fn clear(&self) -> MemoryResult<()> {
104 let mut guard = self.file.lock().await;
105 guard.rewind().await?;
106 guard.set_len(0).await?;
107 guard.flush().await?;
108 Ok(())
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115 use bytes::Bytes;
116 use uuid::Uuid;
117
118 use crate::record::MemoryChannel;
119
120 fn temp_path() -> PathBuf {
121 let mut path = std::env::temp_dir();
122 path.push(format!("memory-journal-{}.log", Uuid::new_v4()));
123 path
124 }
125
126 #[tokio::test]
127 async fn append_and_tail_roundtrip() {
128 let path = temp_path();
129 let journal = FileJournal::open(&path).await.unwrap();
130
131 for content in ["one", "two", "three"] {
132 let record = crate::record::MemoryRecord::builder(
133 MemoryChannel::Input,
134 Bytes::from_static(content.as_bytes()),
135 )
136 .build()
137 .unwrap();
138 journal.append(&record).await.unwrap();
139 }
140
141 let tail = journal.tail(2).await.unwrap();
142 assert_eq!(tail.len(), 2);
143 assert_eq!(tail[0].payload(), &Bytes::from_static(b"two"));
144 assert_eq!(tail[1].payload(), &Bytes::from_static(b"three"));
145
146 journal.clear().await.unwrap();
147 let empty = journal.tail(10).await.unwrap();
148 assert!(empty.is_empty());
149
150 if path.exists() {
151 let _ = std::fs::remove_file(path);
152 }
153 }
154}