Skip to main content

lellm_graph/
mutation_log.rs

1//! MutationLog — 持久化审计日志,独立于 Checkpoint。
2//!
3//! Checkpoint = Snapshot(快速恢复)
4//! ExecutionTrace = 内存 WAL(session 调试)
5//! MutationLog = 持久化审计(可选重放)
6//!
7//! # 四层数据模型
8//!
9//! ```text
10//! Runtime (AgentState)     ← 工作集,Prompt Buffer
11//!       ↓ commit_batch
12//! Checkpoint (Snapshot)    ← 快速恢复,物化状态
13//!       ↓ mutation_log.append()
14//! MutationLog (审计)       ← 持久化,可选重放
15//!       ↓ archive
16//! Conversation Archive     ← 长期存储
17//! ```
18
19use std::collections::HashMap;
20use std::sync::RwLock;
21use std::time::SystemTime;
22
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25
26use crate::checkpoint::NodeId;
27use crate::checkpoint::{CheckpointId, CheckpointStoreError, TraceId};
28
29// ─── MutationLogEntry ──────────────────────────────────────────
30
31/// Mutation 日志条目 — 持久化审计记录。
32///
33/// 使用 `serde_json::Value` 存储 mutation 内容,
34/// 避免在存储层引入对强类型 Mutation 的依赖。
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct MutationLogEntry {
37    /// 执行追踪 ID
38    pub trace_id: TraceId,
39    /// 步骤序号(从 1 开始)
40    pub step: usize,
41    /// 节点标识
42    pub node_id: NodeId,
43    /// 关联的 Checkpoint(如果有)
44    pub checkpoint_id: Option<CheckpointId>,
45    /// 步骤内的 mutation 序号
46    pub mutation_index: usize,
47    /// 序列化后的 mutation 内容
48    pub mutation: serde_json::Value,
49    /// 记录时间
50    pub timestamp: SystemTime,
51}
52
53impl MutationLogEntry {
54    pub fn new(
55        trace_id: TraceId,
56        step: usize,
57        node_id: NodeId,
58        checkpoint_id: Option<CheckpointId>,
59        mutation_index: usize,
60        mutation: serde_json::Value,
61    ) -> Self {
62        Self {
63            trace_id,
64            step,
65            node_id,
66            checkpoint_id,
67            mutation_index,
68            mutation,
69            timestamp: SystemTime::now(),
70        }
71    }
72}
73
74// ─── MutationLogStore SPI ──────────────────────────────────────
75
76/// MutationLog 存储后端 SPI。
77///
78/// 独立于 CheckpointStore,允许不同的持久化策略。
79#[async_trait]
80pub trait MutationLogStore: Send + Sync {
81    /// 追加一条 mutation 日志。
82    async fn append(&self, entry: MutationLogEntry) -> Result<(), CheckpointStoreError>;
83
84    /// 批量追加 mutation 日志。
85    async fn append_batch(
86        &self,
87        entries: Vec<MutationLogEntry>,
88    ) -> Result<(), CheckpointStoreError> {
89        for entry in entries {
90            self.append(entry).await?;
91        }
92        Ok(())
93    }
94
95    /// 重放 trace 从指定步骤开始的 mutation 日志。
96    async fn replay(
97        &self,
98        trace_id: &TraceId,
99        from_step: usize,
100    ) -> Result<Vec<MutationLogEntry>, CheckpointStoreError>;
101
102    /// 截断 trace 的旧日志,保留从指定步骤开始的。
103    async fn truncate(
104        &self,
105        trace_id: &TraceId,
106        keep_from_step: usize,
107    ) -> Result<usize, CheckpointStoreError>;
108}
109
110// ─── InMemoryMutationLog ───────────────────────────────────────
111
112/// 基于内存的 MutationLog 实现。
113///
114/// 适用于测试和开发环境。
115#[derive(Default)]
116pub struct InMemoryMutationLog {
117    entries: RwLock<Vec<MutationLogEntry>>,
118    /// trace_id → [entry indices] 索引
119    index: RwLock<HashMap<TraceId, Vec<usize>>>,
120}
121
122impl InMemoryMutationLog {
123    pub fn new() -> Self {
124        Self::default()
125    }
126
127    pub fn len(&self) -> usize {
128        self.entries.read().unwrap().len()
129    }
130
131    pub fn is_empty(&self) -> bool {
132        self.len() == 0
133    }
134}
135
136#[async_trait]
137impl MutationLogStore for InMemoryMutationLog {
138    async fn append(&self, entry: MutationLogEntry) -> Result<(), CheckpointStoreError> {
139        let trace_id = entry.trace_id;
140        let idx = {
141            let mut entries = self
142                .entries
143                .write()
144                .map_err(|e| CheckpointStoreError::Storage(e.to_string()))?;
145            let idx = entries.len();
146            entries.push(entry);
147            idx
148        };
149        {
150            let mut index_map = self
151                .index
152                .write()
153                .map_err(|e| CheckpointStoreError::Storage(e.to_string()))?;
154            index_map.entry(trace_id).or_default().push(idx);
155        }
156        Ok(())
157    }
158
159    async fn replay(
160        &self,
161        trace_id: &TraceId,
162        from_step: usize,
163    ) -> Result<Vec<MutationLogEntry>, CheckpointStoreError> {
164        let entry_indices = {
165            let index_map = self
166                .index
167                .read()
168                .map_err(|e| CheckpointStoreError::Storage(e.to_string()))?;
169            index_map.get(trace_id).cloned().unwrap_or_default()
170        };
171
172        let entries = self
173            .entries
174            .read()
175            .map_err(|e| CheckpointStoreError::Storage(e.to_string()))?;
176
177        let mut result = Vec::new();
178        for &idx in &entry_indices {
179            if idx < entries.len() {
180                let entry = &entries[idx];
181                if entry.step >= from_step {
182                    result.push(entry.clone());
183                }
184            }
185        }
186        Ok(result)
187    }
188
189    async fn truncate(
190        &self,
191        trace_id: &TraceId,
192        keep_from_step: usize,
193    ) -> Result<usize, CheckpointStoreError> {
194        let entry_indices: Vec<usize> = {
195            let index_map = self
196                .index
197                .read()
198                .map_err(|e| CheckpointStoreError::Storage(e.to_string()))?;
199            index_map.get(trace_id).cloned().unwrap_or_default()
200        };
201
202        let entries = self
203            .entries
204            .read()
205            .map_err(|e| CheckpointStoreError::Storage(e.to_string()))?;
206
207        let mut removed = 0;
208        for &idx in &entry_indices {
209            if idx < entries.len() && entries[idx].step < keep_from_step {
210                removed += 1;
211            }
212        }
213
214        // Note: In-memory truncate is soft (count only).
215        // A hard truncate would require rewriting the entries vector.
216        Ok(removed)
217    }
218}
219
220// ─── MutationLogConverter ──────────────────────────────────────
221
222/// Mutation 到 JSON 的转换器 — 供执行循环使用。
223///
224/// 将强类型 Mutation 批量转换为 MutationLogEntry。
225pub fn mutations_to_log_entries<E: Serialize>(
226    trace_id: TraceId,
227    step: usize,
228    node_id: NodeId,
229    checkpoint_id: Option<CheckpointId>,
230    mutations: impl IntoIterator<Item = E>,
231) -> Vec<MutationLogEntry> {
232    let mut result = Vec::new();
233    for (idx, mutation) in mutations.into_iter().enumerate() {
234        if let Ok(value) = serde_json::to_value(&mutation) {
235            result.push(MutationLogEntry::new(
236                trace_id,
237                step,
238                node_id.clone(),
239                checkpoint_id.clone(),
240                idx,
241                value,
242            ));
243        }
244    }
245    result
246}