allsource_core/infrastructure/persistence/
cold_tier.rs1use crate::{
30 domain::entities::Event,
31 error::{AllSourceError, Result},
32 infrastructure::persistence::storage::ParquetStorage,
33};
34use chrono::{DateTime, Utc};
35use std::{
36 fmt,
37 path::{Path, PathBuf},
38 sync::Arc,
39};
40
41pub trait ArchiveTarget: Send + Sync + fmt::Debug {
53 fn archive(
57 &self,
58 tenant_id: &str,
59 from: DateTime<Utc>,
60 to: DateTime<Utc>,
61 events: &[Event],
62 ) -> Result<()>;
63
64 fn description(&self) -> String {
67 format!("{self:?}")
68 }
69}
70
71pub struct LocalFsArchive {
80 storage: Arc<ParquetStorage>,
84 root: PathBuf,
87}
88
89impl LocalFsArchive {
90 pub fn new(root: impl Into<PathBuf>) -> Result<Self> {
95 let root: PathBuf = root.into();
96 let storage = ParquetStorage::new(&root).map_err(|e| {
97 AllSourceError::StorageError(format!(
98 "cold-tier archive: failed to open ParquetStorage at {}: {e}",
99 root.display()
100 ))
101 })?;
102 Ok(Self {
103 storage: Arc::new(storage),
104 root,
105 })
106 }
107
108 pub fn root(&self) -> &Path {
110 &self.root
111 }
112}
113
114impl fmt::Debug for LocalFsArchive {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 f.debug_struct("LocalFsArchive")
117 .field("root", &self.root)
118 .finish()
119 }
120}
121
122impl ArchiveTarget for LocalFsArchive {
123 fn archive(
124 &self,
125 tenant_id: &str,
126 from: DateTime<Utc>,
127 to: DateTime<Utc>,
128 events: &[Event],
129 ) -> Result<()> {
130 if events.is_empty() {
131 return Ok(());
132 }
133
134 let file_stem = format!(
137 "archive.{tenant_id}.{}-{}",
138 super::compaction::format_iso_basic(from),
139 super::compaction::format_iso_basic(to)
140 );
141 let path = self
142 .storage
143 .write_atomic_parquet(tenant_id, &file_stem, events)?;
144 tracing::info!(
145 tenant_id = tenant_id,
146 archive_path = %path.display(),
147 events = events.len(),
148 from = %from.to_rfc3339(),
149 to = %to.to_rfc3339(),
150 "cold-tier archive: wrote dropped events"
151 );
152 Ok(())
153 }
154
155 fn description(&self) -> String {
156 format!("local-fs:{}", self.root.display())
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use crate::domain::entities::Event;
164 use chrono::Duration;
165 use serde_json::json;
166 use tempfile::TempDir;
167 use uuid::Uuid;
168
169 fn make_event(tenant: &str, ts: DateTime<Utc>) -> Event {
170 Event::reconstruct_from_strings(
171 Uuid::new_v4(),
172 "test.event".to_string(),
173 "entity-1".to_string(),
174 tenant.to_string(),
175 json!({"x": 1}),
176 ts,
177 None,
178 1,
179 )
180 }
181
182 #[test]
183 fn test_local_fs_archive_writes_one_file() {
184 let dir = TempDir::new().unwrap();
185 let archive = LocalFsArchive::new(dir.path()).unwrap();
186
187 let now = Utc::now();
188 let events: Vec<_> = (0..10)
189 .map(|i| make_event("acme", now - Duration::days(i)))
190 .collect();
191 let from = events.iter().map(|e| e.timestamp).min().unwrap();
192 let to = events.iter().map(|e| e.timestamp).max().unwrap();
193
194 archive.archive("acme", from, to, &events).unwrap();
195
196 let mut found = vec![];
198 let mut stack = vec![dir.path().to_path_buf()];
199 while let Some(d) = stack.pop() {
200 for entry in std::fs::read_dir(&d).unwrap().flatten() {
201 let p = entry.path();
202 if p.is_dir() {
203 stack.push(p);
204 } else if p.extension().is_some_and(|e| e == "parquet") {
205 found.push(p);
206 }
207 }
208 }
209 assert_eq!(found.len(), 1, "exactly one archive file");
210 let name = found[0].file_name().unwrap().to_string_lossy().to_string();
211 assert!(
212 name.starts_with("archive.acme."),
213 "filename starts with archive.<tenant>., got {name}"
214 );
215 assert!(
216 found[0].to_string_lossy().contains("/acme/"),
217 "path contains tenant subdir: {}",
218 found[0].display()
219 );
220 }
221
222 #[test]
223 fn test_local_fs_archive_empty_batch_is_noop() {
224 let dir = TempDir::new().unwrap();
225 let archive = LocalFsArchive::new(dir.path()).unwrap();
226 let now = Utc::now();
227 archive.archive("acme", now, now, &[]).unwrap();
228
229 let mut count = 0;
231 let mut stack = vec![dir.path().to_path_buf()];
232 while let Some(d) = stack.pop() {
233 for entry in std::fs::read_dir(&d).unwrap().flatten() {
234 let p = entry.path();
235 if p.is_dir() {
236 stack.push(p);
237 } else if p.extension().is_some_and(|e| e == "parquet") {
238 count += 1;
239 }
240 }
241 }
242 assert_eq!(count, 0);
243 }
244
245 #[test]
246 fn test_local_fs_archive_idempotent_on_same_window() {
247 let dir = TempDir::new().unwrap();
254 let archive = LocalFsArchive::new(dir.path()).unwrap();
255
256 let now = Utc::now();
257 let events: Vec<_> = (0..3)
258 .map(|i| make_event("acme", now - Duration::hours(i)))
259 .collect();
260 let from = events.iter().map(|e| e.timestamp).min().unwrap();
261 let to = events.iter().map(|e| e.timestamp).max().unwrap();
262
263 archive.archive("acme", from, to, &events).unwrap();
264 archive.archive("acme", from, to, &events).unwrap();
265
266 let mut count = 0;
267 let mut stack = vec![dir.path().to_path_buf()];
268 while let Some(d) = stack.pop() {
269 for entry in std::fs::read_dir(&d).unwrap().flatten() {
270 let p = entry.path();
271 if p.is_dir() {
272 stack.push(p);
273 } else if p.extension().is_some_and(|e| e == "parquet") {
274 count += 1;
275 }
276 }
277 }
278 assert!(count == 1 || count == 2, "got {count} archive files");
283 }
284
285 #[test]
286 fn test_archive_target_description_includes_root() {
287 let dir = TempDir::new().unwrap();
288 let archive = LocalFsArchive::new(dir.path()).unwrap();
289 let desc = archive.description();
290 assert!(desc.starts_with("local-fs:"), "got {desc}");
291 assert!(
292 desc.contains(&dir.path().display().to_string()),
293 "got {desc}"
294 );
295 }
296}