Skip to main content

forja_memory/
storage.rs

1use chrono::{Local, NaiveDate, TimeZone};
2use forja_core::error::{ForjaError as Error, Result};
3use forja_core::types::MemoryEntry;
4use serde::Deserialize;
5use std::fmt::Display;
6use std::path::{Path, PathBuf};
7use tokio::fs;
8use tokio::io::AsyncWriteExt;
9
10#[derive(Debug, Deserialize)]
11struct LegacyFrontmatter {
12    id: String,
13    timestamp: u64,
14    tags: Vec<String>,
15}
16
17#[derive(Debug, Clone)]
18pub struct Storage {
19    memory_file: PathBuf,
20    archive_dir: PathBuf,
21    sessions_dir: PathBuf,
22    sessions_backup_dir: PathBuf,
23}
24
25impl Storage {
26    pub async fn init(memory_file: impl AsRef<Path>) -> Result<Self> {
27        let memory_file = memory_file.as_ref().to_path_buf();
28        let base_dir = memory_file
29            .parent()
30            .map(Path::to_path_buf)
31            .unwrap_or_else(|| PathBuf::from("."));
32
33        fs::create_dir_all(&base_dir)
34            .await
35            .map_err(|error| Error::Storage(format!("Failed to create memory dir: {error}")))?;
36
37        let storage = Self {
38            memory_file,
39            archive_dir: base_dir.join("archive"),
40            sessions_dir: base_dir.join("sessions"),
41            sessions_backup_dir: base_dir.join("sessions.bak"),
42        };
43
44        storage.ensure_memory_file().await?;
45        storage.migrate_legacy_sessions().await?;
46
47        Ok(storage)
48    }
49
50    pub async fn append_entry(&self, entry: &MemoryEntry) -> Result<()> {
51        if should_skip_entry(entry) {
52            return Ok(());
53        }
54
55        let line = format_memory_line(entry);
56        let entry_date = format_date(entry.timestamp);
57        let block = match self.last_recorded_date().await? {
58            Some(last_date) if last_date == entry_date => line,
59            _ => format!("--- {entry_date} ---\n{line}"),
60        };
61
62        self.append_block(&block).await
63    }
64
65    pub async fn flush_and_summarize<F, E>(&self, summarizer: F) -> Result<()>
66    where
67        F: Fn(String) -> std::result::Result<String, E>,
68        E: Display,
69    {
70        let contents = self.read_all().await?;
71        if contents.trim().is_empty() {
72            return Ok(());
73        }
74
75        let sections = parse_memory_sections(&contents);
76        let today = Local::now().format("%Y-%m-%d").to_string();
77        let mut archived_blocks = Vec::new();
78        let mut rewritten_sections = Vec::with_capacity(sections.len());
79
80        for section in sections {
81            match section {
82                MemorySection::DateBlock(block)
83                    if block.date != today
84                        && !block.lines.is_empty()
85                        && !self.archive_path(&block.date).exists() =>
86                {
87                    let original_block = block.render();
88                    match summarizer(original_block.clone()) {
89                        Ok(summary) => {
90                            if let Some(summary_lines) = normalize_summary_lines(&summary) {
91                                archived_blocks.push(ArchivedBlock {
92                                    date: block.date.clone(),
93                                    contents: original_block,
94                                });
95                                rewritten_sections.push(MemorySection::DateBlock(DateBlock {
96                                    date: block.date,
97                                    lines: summary_lines,
98                                }));
99                            } else {
100                                rewritten_sections.push(MemorySection::DateBlock(block));
101                            }
102                        }
103                        Err(error) => {
104                            eprintln!("[Memory] summarize skipped for {}: {error}", block.date);
105                            rewritten_sections.push(MemorySection::DateBlock(block));
106                        }
107                    }
108                }
109                _ => rewritten_sections.push(section),
110            }
111        }
112
113        if archived_blocks.is_empty() {
114            return Ok(());
115        }
116
117        fs::create_dir_all(&self.archive_dir)
118            .await
119            .map_err(|error| Error::Storage(format!("Failed to create archive dir: {error}")))?;
120
121        for archived_block in &archived_blocks {
122            fs::write(
123                self.archive_path(&archived_block.date),
124                format_with_trailing_newline(&archived_block.contents),
125            )
126            .await
127            .map_err(|error| Error::Storage(format!("Failed to write archive file: {error}")))?;
128        }
129
130        fs::write(
131            &self.memory_file,
132            format_with_trailing_newline(&render_memory_sections(&rewritten_sections)),
133        )
134        .await
135        .map_err(|error| Error::Storage(format!("Failed to rewrite memory file: {error}")))
136    }
137
138    pub async fn read_all(&self) -> Result<String> {
139        self.ensure_memory_file().await?;
140
141        fs::read_to_string(&self.memory_file)
142            .await
143            .map_err(|error| Error::Storage(format!("Failed to read memory file: {error}")))
144    }
145
146    async fn ensure_memory_file(&self) -> Result<()> {
147        if fs::try_exists(&self.memory_file)
148            .await
149            .map_err(|error| Error::Storage(format!("Failed to inspect memory file: {error}")))?
150        {
151            return Ok(());
152        }
153
154        fs::write(&self.memory_file, "")
155            .await
156            .map_err(|error| Error::Storage(format!("Failed to create memory file: {error}")))
157    }
158
159    async fn append_block(&self, block: &str) -> Result<()> {
160        let normalized_block = block.trim_matches('\n');
161        if normalized_block.is_empty() {
162            return Ok(());
163        }
164
165        let existing = self.read_all().await?;
166        let needs_separator = !existing.is_empty() && !existing.ends_with('\n');
167
168        let mut file = fs::OpenOptions::new()
169            .create(true)
170            .append(true)
171            .open(&self.memory_file)
172            .await
173            .map_err(|error| Error::Storage(format!("Failed to open memory file: {error}")))?;
174
175        if needs_separator {
176            file.write_all(b"\n")
177                .await
178                .map_err(|error| Error::Storage(format!("Failed to append separator: {error}")))?;
179        }
180
181        file.write_all(normalized_block.as_bytes())
182            .await
183            .map_err(|error| Error::Storage(format!("Failed to append memory block: {error}")))?;
184        file.write_all(b"\n")
185            .await
186            .map_err(|error| Error::Storage(format!("Failed to finalize memory block: {error}")))
187    }
188
189    async fn last_recorded_date(&self) -> Result<Option<String>> {
190        let contents = self.read_all().await?;
191        Ok(contents.lines().rev().find_map(parse_date_header))
192    }
193
194    fn archive_path(&self, date: &str) -> PathBuf {
195        self.archive_dir.join(format!("{date}.md"))
196    }
197
198    async fn migrate_legacy_sessions(&self) -> Result<()> {
199        if !self.sessions_dir.exists() {
200            return Ok(());
201        }
202
203        let entries = self.read_legacy_entries().await?;
204        if entries.is_empty() {
205            return Ok(());
206        }
207
208        let file_count = entries.len();
209
210        for entry in &entries {
211            self.append_entry(entry).await?;
212        }
213
214        let memory_size = fs::metadata(&self.memory_file)
215            .await
216            .map_err(|error| Error::Storage(format!("Failed to read memory file metadata: {error}")))?
217            .len();
218        println!(
219            "[Memory] Migration: sessions/*.md {file_count} files -> memory.md ({})",
220            format_byte_size(memory_size)
221        );
222
223        let backup_dir = self.next_sessions_backup_dir();
224        fs::rename(&self.sessions_dir, &backup_dir)
225            .await
226            .map_err(|error| Error::Storage(format!("Failed to rename sessions dir: {error}")))?;
227
228        if backup_dir == self.sessions_backup_dir {
229            println!("[Memory] sessions/ -> sessions.bak/ completed");
230        } else if let Some(name) = backup_dir.file_name().and_then(|name| name.to_str()) {
231            println!("[Memory] sessions/ -> {name}/ completed");
232        }
233
234        Ok(())
235    }
236
237    fn next_sessions_backup_dir(&self) -> PathBuf {
238        if !self.sessions_backup_dir.exists() {
239            return self.sessions_backup_dir.clone();
240        }
241
242        let base_name = self
243            .sessions_backup_dir
244            .file_name()
245            .and_then(|name| name.to_str())
246            .unwrap_or("sessions.bak");
247
248        for index in 1.. {
249            let candidate = self.sessions_backup_dir.with_file_name(format!("{base_name}.{index}"));
250            if !candidate.exists() {
251                return candidate;
252            }
253        }
254
255        unreachable!("infinite iterator should always find an available backup path")
256    }
257
258    async fn read_legacy_entries(&self) -> Result<Vec<MemoryEntry>> {
259        let mut entries = Vec::new();
260        let mut read_dir = fs::read_dir(&self.sessions_dir)
261            .await
262            .map_err(|error| Error::Storage(format!("Failed to read sessions dir: {error}")))?;
263
264        while let Some(entry) = read_dir
265            .next_entry()
266            .await
267            .map_err(|error| Error::Storage(error.to_string()))?
268        {
269            let path = entry.path();
270            if path.is_file()
271                && path.extension().and_then(|extension| extension.to_str()) == Some("md")
272                && let Ok(memory_entry) = Self::parse_legacy_file(&path).await
273            {
274                entries.push(memory_entry);
275            }
276        }
277
278        entries.sort_by(|left, right| left.timestamp.cmp(&right.timestamp));
279        Ok(entries)
280    }
281
282    async fn parse_legacy_file(path: &Path) -> Result<MemoryEntry> {
283        let content = fs::read_to_string(path)
284            .await
285            .map_err(|error| Error::Storage(format!("Failed to read file {:?}: {error}", path)))?;
286
287        if !content.starts_with("---\n") && !content.starts_with("---\r\n") {
288            return Err(Error::Storage(format!("Invalid Frontmatter format in {:?}", path)));
289        }
290
291        let parts: Vec<&str> = content.splitn(3, "---").collect();
292        if parts.len() < 3 {
293            return Err(Error::Storage(format!(
294                "Cannot parse Markdown YAML block in {:?}",
295                path
296            )));
297        }
298
299        let frontmatter: LegacyFrontmatter = serde_yaml::from_str(parts[1].trim()).map_err(|error| {
300            Error::Deserialization(format!("YAML deserialize error: {error}"))
301        })?;
302
303        Ok(MemoryEntry {
304            id: frontmatter.id,
305            timestamp: frontmatter.timestamp,
306            tags: frontmatter.tags,
307            content: parts[2].trim().to_string(),
308            score: 0.0,
309            metadata: Default::default(),
310        })
311    }
312}
313
314fn format_memory_line(entry: &MemoryEntry) -> String {
315    let time_text = format_timestamp(entry.timestamp);
316    let role = entry_role(entry);
317    let normalized = normalize_content(&entry.content);
318    format!("{time_text} | {role} | {normalized}")
319}
320
321fn format_timestamp(timestamp: u64) -> String {
322    let local_time = Local
323        .timestamp_opt(timestamp as i64, 0)
324        .single()
325        .unwrap_or_else(|| Local.timestamp_opt(0, 0).earliest().unwrap());
326    local_time.format("%H:%M").to_string()
327}
328
329fn format_date(timestamp: u64) -> String {
330    let local_time = Local
331        .timestamp_opt(timestamp as i64, 0)
332        .single()
333        .unwrap_or_else(|| Local.timestamp_opt(0, 0).earliest().unwrap());
334    local_time.format("%Y-%m-%d").to_string()
335}
336
337fn entry_role(entry: &MemoryEntry) -> &str {
338    if let Some(role) = entry.tags.iter().find_map(|tag| match tag.as_str() {
339        "assistant" => Some("assistant"),
340        "system" => Some("system"),
341        "tool" => Some("tool"),
342        "user" => Some("user"),
343        _ => None,
344    }) {
345        return role;
346    }
347
348    if entry.id.starts_with("assistant_") {
349        return "assistant";
350    }
351    if entry.id.starts_with("system_") {
352        return "system";
353    }
354    if entry.id.starts_with("tool_") {
355        return "tool";
356    }
357    if entry.id.starts_with("user_") {
358        return "user";
359    }
360
361    "memory"
362}
363
364fn normalize_content(content: &str) -> String {
365    content.split_whitespace().collect::<Vec<_>>().join(" ")
366}
367
368fn should_skip_entry(entry: &MemoryEntry) -> bool {
369    entry.content.contains("MockStream")
370}
371
372fn parse_date_header(line: &str) -> Option<String> {
373    let date_text = line.strip_prefix("--- ")?.strip_suffix(" ---")?;
374    NaiveDate::parse_from_str(date_text, "%Y-%m-%d")
375        .ok()
376        .map(|_| date_text.to_string())
377}
378
379fn normalize_summary_lines(summary: &str) -> Option<Vec<String>> {
380    let lines = summary
381        .lines()
382        .map(str::trim)
383        .filter(|line| !line.is_empty())
384        .take(3)
385        .map(str::to_string)
386        .collect::<Vec<_>>();
387
388    if lines.is_empty() {
389        return None;
390    }
391
392    Some(lines)
393}
394
395fn render_memory_sections(sections: &[MemorySection]) -> String {
396    sections
397        .iter()
398        .map(MemorySection::render)
399        .filter(|section| !section.is_empty())
400        .collect::<Vec<_>>()
401        .join("\n")
402}
403
404fn format_with_trailing_newline(contents: &str) -> String {
405    let trimmed = contents.trim_end_matches('\n');
406    if trimmed.is_empty() {
407        return String::new();
408    }
409
410    format!("{trimmed}\n")
411}
412
413fn parse_memory_sections(contents: &str) -> Vec<MemorySection> {
414    let mut sections = Vec::new();
415    let mut raw_lines = Vec::new();
416    let mut current_block: Option<DateBlock> = None;
417
418    for line in contents.lines() {
419        if let Some(date) = parse_date_header(line) {
420            if !raw_lines.is_empty() {
421                sections.push(MemorySection::Raw(raw_lines.join("\n")));
422                raw_lines.clear();
423            }
424
425            if let Some(block) = current_block.take() {
426                sections.push(MemorySection::DateBlock(block));
427            }
428
429            current_block = Some(DateBlock {
430                date,
431                lines: Vec::new(),
432            });
433            continue;
434        }
435
436        if let Some(block) = &mut current_block {
437            block.lines.push(line.to_string());
438        } else {
439            raw_lines.push(line.to_string());
440        }
441    }
442
443    if !raw_lines.is_empty() {
444        sections.push(MemorySection::Raw(raw_lines.join("\n")));
445    }
446
447    if let Some(block) = current_block {
448        sections.push(MemorySection::DateBlock(block));
449    }
450
451    sections
452}
453
454#[derive(Debug, Clone)]
455struct ArchivedBlock {
456    date: String,
457    contents: String,
458}
459
460#[derive(Debug, Clone)]
461struct DateBlock {
462    date: String,
463    lines: Vec<String>,
464}
465
466impl DateBlock {
467    fn render(&self) -> String {
468        if self.lines.is_empty() {
469            return format!("--- {} ---", self.date);
470        }
471
472        format!("--- {} ---\n{}", self.date, self.lines.join("\n"))
473    }
474}
475
476#[derive(Debug, Clone)]
477enum MemorySection {
478    Raw(String),
479    DateBlock(DateBlock),
480}
481
482impl MemorySection {
483    fn render(&self) -> String {
484        match self {
485            Self::Raw(contents) => contents.clone(),
486            Self::DateBlock(block) => block.render(),
487        }
488    }
489}
490
491fn format_byte_size(bytes: u64) -> String {
492    if bytes < 1024 {
493        return format!("{bytes}B");
494    }
495
496    let kilobytes = bytes as f64 / 1024.0;
497    if kilobytes < 1024.0 {
498        return format!("{kilobytes:.1}KB");
499    }
500
501    let megabytes = kilobytes / 1024.0;
502    format!("{megabytes:.1}MB")
503}