1use chrono::{Local, NaiveDate, TimeZone};
2use forja_core::error::{ForjaError as Error, Result};
3use forja_core::types::MemoryEntry;
4use serde::Deserialize;
5use std::fmt::Display;
6use std::path::{Path, PathBuf};
7use tokio::fs;
8use tokio::io::AsyncWriteExt;
9
10#[derive(Debug, Deserialize)]
11struct LegacyFrontmatter {
12 id: String,
13 timestamp: u64,
14 tags: Vec<String>,
15}
16
17#[derive(Debug, Clone)]
18pub struct Storage {
19 memory_file: PathBuf,
20 archive_dir: PathBuf,
21 sessions_dir: PathBuf,
22 sessions_backup_dir: PathBuf,
23}
24
25impl Storage {
26 pub async fn init(memory_file: impl AsRef<Path>) -> Result<Self> {
27 let memory_file = memory_file.as_ref().to_path_buf();
28 let base_dir = memory_file
29 .parent()
30 .map(Path::to_path_buf)
31 .unwrap_or_else(|| PathBuf::from("."));
32
33 fs::create_dir_all(&base_dir)
34 .await
35 .map_err(|error| Error::Storage(format!("Failed to create memory dir: {error}")))?;
36
37 let storage = Self {
38 memory_file,
39 archive_dir: base_dir.join("archive"),
40 sessions_dir: base_dir.join("sessions"),
41 sessions_backup_dir: base_dir.join("sessions.bak"),
42 };
43
44 storage.ensure_memory_file().await?;
45 storage.migrate_legacy_sessions().await?;
46
47 Ok(storage)
48 }
49
50 pub async fn append_entry(&self, entry: &MemoryEntry) -> Result<()> {
51 if should_skip_entry(entry) {
52 return Ok(());
53 }
54
55 let line = format_memory_line(entry);
56 let entry_date = format_date(entry.timestamp);
57 let block = match self.last_recorded_date().await? {
58 Some(last_date) if last_date == entry_date => line,
59 _ => format!("--- {entry_date} ---\n{line}"),
60 };
61
62 self.append_block(&block).await
63 }
64
65 pub async fn flush_and_summarize<F, E>(&self, summarizer: F) -> Result<()>
66 where
67 F: Fn(String) -> std::result::Result<String, E>,
68 E: Display,
69 {
70 let contents = self.read_all().await?;
71 if contents.trim().is_empty() {
72 return Ok(());
73 }
74
75 let sections = parse_memory_sections(&contents);
76 let today = Local::now().format("%Y-%m-%d").to_string();
77 let mut archived_blocks = Vec::new();
78 let mut rewritten_sections = Vec::with_capacity(sections.len());
79
80 for section in sections {
81 match section {
82 MemorySection::DateBlock(block)
83 if block.date != today
84 && !block.lines.is_empty()
85 && !self.archive_path(&block.date).exists() =>
86 {
87 let original_block = block.render();
88 match summarizer(original_block.clone()) {
89 Ok(summary) => {
90 if let Some(summary_lines) = normalize_summary_lines(&summary) {
91 archived_blocks.push(ArchivedBlock {
92 date: block.date.clone(),
93 contents: original_block,
94 });
95 rewritten_sections.push(MemorySection::DateBlock(DateBlock {
96 date: block.date,
97 lines: summary_lines,
98 }));
99 } else {
100 rewritten_sections.push(MemorySection::DateBlock(block));
101 }
102 }
103 Err(error) => {
104 eprintln!("[Memory] summarize skipped for {}: {error}", block.date);
105 rewritten_sections.push(MemorySection::DateBlock(block));
106 }
107 }
108 }
109 _ => rewritten_sections.push(section),
110 }
111 }
112
113 if archived_blocks.is_empty() {
114 return Ok(());
115 }
116
117 fs::create_dir_all(&self.archive_dir)
118 .await
119 .map_err(|error| Error::Storage(format!("Failed to create archive dir: {error}")))?;
120
121 for archived_block in &archived_blocks {
122 fs::write(
123 self.archive_path(&archived_block.date),
124 format_with_trailing_newline(&archived_block.contents),
125 )
126 .await
127 .map_err(|error| Error::Storage(format!("Failed to write archive file: {error}")))?;
128 }
129
130 fs::write(
131 &self.memory_file,
132 format_with_trailing_newline(&render_memory_sections(&rewritten_sections)),
133 )
134 .await
135 .map_err(|error| Error::Storage(format!("Failed to rewrite memory file: {error}")))
136 }
137
138 pub async fn read_all(&self) -> Result<String> {
139 self.ensure_memory_file().await?;
140
141 fs::read_to_string(&self.memory_file)
142 .await
143 .map_err(|error| Error::Storage(format!("Failed to read memory file: {error}")))
144 }
145
146 async fn ensure_memory_file(&self) -> Result<()> {
147 if fs::try_exists(&self.memory_file)
148 .await
149 .map_err(|error| Error::Storage(format!("Failed to inspect memory file: {error}")))?
150 {
151 return Ok(());
152 }
153
154 fs::write(&self.memory_file, "")
155 .await
156 .map_err(|error| Error::Storage(format!("Failed to create memory file: {error}")))
157 }
158
159 async fn append_block(&self, block: &str) -> Result<()> {
160 let normalized_block = block.trim_matches('\n');
161 if normalized_block.is_empty() {
162 return Ok(());
163 }
164
165 let existing = self.read_all().await?;
166 let needs_separator = !existing.is_empty() && !existing.ends_with('\n');
167
168 let mut file = fs::OpenOptions::new()
169 .create(true)
170 .append(true)
171 .open(&self.memory_file)
172 .await
173 .map_err(|error| Error::Storage(format!("Failed to open memory file: {error}")))?;
174
175 if needs_separator {
176 file.write_all(b"\n")
177 .await
178 .map_err(|error| Error::Storage(format!("Failed to append separator: {error}")))?;
179 }
180
181 file.write_all(normalized_block.as_bytes())
182 .await
183 .map_err(|error| Error::Storage(format!("Failed to append memory block: {error}")))?;
184 file.write_all(b"\n")
185 .await
186 .map_err(|error| Error::Storage(format!("Failed to finalize memory block: {error}")))
187 }
188
189 async fn last_recorded_date(&self) -> Result<Option<String>> {
190 let contents = self.read_all().await?;
191 Ok(contents.lines().rev().find_map(parse_date_header))
192 }
193
194 fn archive_path(&self, date: &str) -> PathBuf {
195 self.archive_dir.join(format!("{date}.md"))
196 }
197
198 async fn migrate_legacy_sessions(&self) -> Result<()> {
199 if !self.sessions_dir.exists() {
200 return Ok(());
201 }
202
203 let entries = self.read_legacy_entries().await?;
204 if entries.is_empty() {
205 return Ok(());
206 }
207
208 let file_count = entries.len();
209
210 for entry in &entries {
211 self.append_entry(entry).await?;
212 }
213
214 let memory_size = fs::metadata(&self.memory_file)
215 .await
216 .map_err(|error| Error::Storage(format!("Failed to read memory file metadata: {error}")))?
217 .len();
218 println!(
219 "[Memory] Migration: sessions/*.md {file_count} files -> memory.md ({})",
220 format_byte_size(memory_size)
221 );
222
223 let backup_dir = self.next_sessions_backup_dir();
224 fs::rename(&self.sessions_dir, &backup_dir)
225 .await
226 .map_err(|error| Error::Storage(format!("Failed to rename sessions dir: {error}")))?;
227
228 if backup_dir == self.sessions_backup_dir {
229 println!("[Memory] sessions/ -> sessions.bak/ completed");
230 } else if let Some(name) = backup_dir.file_name().and_then(|name| name.to_str()) {
231 println!("[Memory] sessions/ -> {name}/ completed");
232 }
233
234 Ok(())
235 }
236
237 fn next_sessions_backup_dir(&self) -> PathBuf {
238 if !self.sessions_backup_dir.exists() {
239 return self.sessions_backup_dir.clone();
240 }
241
242 let base_name = self
243 .sessions_backup_dir
244 .file_name()
245 .and_then(|name| name.to_str())
246 .unwrap_or("sessions.bak");
247
248 for index in 1.. {
249 let candidate = self.sessions_backup_dir.with_file_name(format!("{base_name}.{index}"));
250 if !candidate.exists() {
251 return candidate;
252 }
253 }
254
255 unreachable!("infinite iterator should always find an available backup path")
256 }
257
258 async fn read_legacy_entries(&self) -> Result<Vec<MemoryEntry>> {
259 let mut entries = Vec::new();
260 let mut read_dir = fs::read_dir(&self.sessions_dir)
261 .await
262 .map_err(|error| Error::Storage(format!("Failed to read sessions dir: {error}")))?;
263
264 while let Some(entry) = read_dir
265 .next_entry()
266 .await
267 .map_err(|error| Error::Storage(error.to_string()))?
268 {
269 let path = entry.path();
270 if path.is_file()
271 && path.extension().and_then(|extension| extension.to_str()) == Some("md")
272 && let Ok(memory_entry) = Self::parse_legacy_file(&path).await
273 {
274 entries.push(memory_entry);
275 }
276 }
277
278 entries.sort_by(|left, right| left.timestamp.cmp(&right.timestamp));
279 Ok(entries)
280 }
281
282 async fn parse_legacy_file(path: &Path) -> Result<MemoryEntry> {
283 let content = fs::read_to_string(path)
284 .await
285 .map_err(|error| Error::Storage(format!("Failed to read file {:?}: {error}", path)))?;
286
287 if !content.starts_with("---\n") && !content.starts_with("---\r\n") {
288 return Err(Error::Storage(format!("Invalid Frontmatter format in {:?}", path)));
289 }
290
291 let parts: Vec<&str> = content.splitn(3, "---").collect();
292 if parts.len() < 3 {
293 return Err(Error::Storage(format!(
294 "Cannot parse Markdown YAML block in {:?}",
295 path
296 )));
297 }
298
299 let frontmatter: LegacyFrontmatter = serde_yaml::from_str(parts[1].trim()).map_err(|error| {
300 Error::Deserialization(format!("YAML deserialize error: {error}"))
301 })?;
302
303 Ok(MemoryEntry {
304 id: frontmatter.id,
305 timestamp: frontmatter.timestamp,
306 tags: frontmatter.tags,
307 content: parts[2].trim().to_string(),
308 score: 0.0,
309 metadata: Default::default(),
310 })
311 }
312}
313
314fn format_memory_line(entry: &MemoryEntry) -> String {
315 let time_text = format_timestamp(entry.timestamp);
316 let role = entry_role(entry);
317 let normalized = normalize_content(&entry.content);
318 format!("{time_text} | {role} | {normalized}")
319}
320
321fn format_timestamp(timestamp: u64) -> String {
322 let local_time = Local
323 .timestamp_opt(timestamp as i64, 0)
324 .single()
325 .unwrap_or_else(|| Local.timestamp_opt(0, 0).earliest().unwrap());
326 local_time.format("%H:%M").to_string()
327}
328
329fn format_date(timestamp: u64) -> String {
330 let local_time = Local
331 .timestamp_opt(timestamp as i64, 0)
332 .single()
333 .unwrap_or_else(|| Local.timestamp_opt(0, 0).earliest().unwrap());
334 local_time.format("%Y-%m-%d").to_string()
335}
336
337fn entry_role(entry: &MemoryEntry) -> &str {
338 if let Some(role) = entry.tags.iter().find_map(|tag| match tag.as_str() {
339 "assistant" => Some("assistant"),
340 "system" => Some("system"),
341 "tool" => Some("tool"),
342 "user" => Some("user"),
343 _ => None,
344 }) {
345 return role;
346 }
347
348 if entry.id.starts_with("assistant_") {
349 return "assistant";
350 }
351 if entry.id.starts_with("system_") {
352 return "system";
353 }
354 if entry.id.starts_with("tool_") {
355 return "tool";
356 }
357 if entry.id.starts_with("user_") {
358 return "user";
359 }
360
361 "memory"
362}
363
364fn normalize_content(content: &str) -> String {
365 content.split_whitespace().collect::<Vec<_>>().join(" ")
366}
367
368fn should_skip_entry(entry: &MemoryEntry) -> bool {
369 entry.content.contains("MockStream")
370}
371
372fn parse_date_header(line: &str) -> Option<String> {
373 let date_text = line.strip_prefix("--- ")?.strip_suffix(" ---")?;
374 NaiveDate::parse_from_str(date_text, "%Y-%m-%d")
375 .ok()
376 .map(|_| date_text.to_string())
377}
378
379fn normalize_summary_lines(summary: &str) -> Option<Vec<String>> {
380 let lines = summary
381 .lines()
382 .map(str::trim)
383 .filter(|line| !line.is_empty())
384 .take(3)
385 .map(str::to_string)
386 .collect::<Vec<_>>();
387
388 if lines.is_empty() {
389 return None;
390 }
391
392 Some(lines)
393}
394
395fn render_memory_sections(sections: &[MemorySection]) -> String {
396 sections
397 .iter()
398 .map(MemorySection::render)
399 .filter(|section| !section.is_empty())
400 .collect::<Vec<_>>()
401 .join("\n")
402}
403
404fn format_with_trailing_newline(contents: &str) -> String {
405 let trimmed = contents.trim_end_matches('\n');
406 if trimmed.is_empty() {
407 return String::new();
408 }
409
410 format!("{trimmed}\n")
411}
412
413fn parse_memory_sections(contents: &str) -> Vec<MemorySection> {
414 let mut sections = Vec::new();
415 let mut raw_lines = Vec::new();
416 let mut current_block: Option<DateBlock> = None;
417
418 for line in contents.lines() {
419 if let Some(date) = parse_date_header(line) {
420 if !raw_lines.is_empty() {
421 sections.push(MemorySection::Raw(raw_lines.join("\n")));
422 raw_lines.clear();
423 }
424
425 if let Some(block) = current_block.take() {
426 sections.push(MemorySection::DateBlock(block));
427 }
428
429 current_block = Some(DateBlock {
430 date,
431 lines: Vec::new(),
432 });
433 continue;
434 }
435
436 if let Some(block) = &mut current_block {
437 block.lines.push(line.to_string());
438 } else {
439 raw_lines.push(line.to_string());
440 }
441 }
442
443 if !raw_lines.is_empty() {
444 sections.push(MemorySection::Raw(raw_lines.join("\n")));
445 }
446
447 if let Some(block) = current_block {
448 sections.push(MemorySection::DateBlock(block));
449 }
450
451 sections
452}
453
454#[derive(Debug, Clone)]
455struct ArchivedBlock {
456 date: String,
457 contents: String,
458}
459
460#[derive(Debug, Clone)]
461struct DateBlock {
462 date: String,
463 lines: Vec<String>,
464}
465
466impl DateBlock {
467 fn render(&self) -> String {
468 if self.lines.is_empty() {
469 return format!("--- {} ---", self.date);
470 }
471
472 format!("--- {} ---\n{}", self.date, self.lines.join("\n"))
473 }
474}
475
476#[derive(Debug, Clone)]
477enum MemorySection {
478 Raw(String),
479 DateBlock(DateBlock),
480}
481
482impl MemorySection {
483 fn render(&self) -> String {
484 match self {
485 Self::Raw(contents) => contents.clone(),
486 Self::DateBlock(block) => block.render(),
487 }
488 }
489}
490
491fn format_byte_size(bytes: u64) -> String {
492 if bytes < 1024 {
493 return format!("{bytes}B");
494 }
495
496 let kilobytes = bytes as f64 / 1024.0;
497 if kilobytes < 1024.0 {
498 return format!("{kilobytes:.1}KB");
499 }
500
501 let megabytes = kilobytes / 1024.0;
502 format!("{megabytes:.1}MB")
503}