Skip to main content

claude_code_sdk_rust/internal/
transcript_mirror.rs

1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use crate::error::Result;
5use crate::session_store::{
6    file_path_to_session_key, SessionStoreEntry, SessionStoreHandle, SessionSummaryEntry,
7};
8use crate::types::{ClaudeAgentOptions, Message, SessionStoreFlushMode};
9
10const MAX_PENDING_ENTRIES: usize = 500;
11const MAX_PENDING_BYTES: usize = 1 << 20;
12
13#[derive(Debug, Clone)]
14struct PendingMirrorFrame {
15    file_path: PathBuf,
16    entries: Vec<SessionStoreEntry>,
17}
18
19#[derive(Debug, Clone)]
20pub struct TranscriptMirrorBatcher {
21    store: SessionStoreHandle,
22    projects_dir: PathBuf,
23    pending: Vec<PendingMirrorFrame>,
24    pending_entries: usize,
25    pending_bytes: usize,
26    flush_mode: SessionStoreFlushMode,
27    mirror_errors: Vec<Message>,
28}
29
30impl TranscriptMirrorBatcher {
31    pub fn from_options(options: &ClaudeAgentOptions) -> Option<Self> {
32        let store = options.session_store.clone()?;
33        Some(Self {
34            store,
35            projects_dir: projects_dir_for_options(options),
36            pending: Vec::new(),
37            pending_entries: 0,
38            pending_bytes: 0,
39            flush_mode: options.session_store_flush,
40            mirror_errors: Vec::new(),
41        })
42    }
43
44    pub async fn enqueue_value(&mut self, value: &serde_json::Value) -> Result<Vec<Message>> {
45        let Some(file_path) = value.get("filePath").and_then(|v| v.as_str()) else {
46            return Ok(Vec::new());
47        };
48        let entries = value
49            .get("entries")
50            .and_then(|v| v.as_array())
51            .into_iter()
52            .flatten()
53            .filter_map(|entry| entry.as_object().cloned())
54            .collect::<Vec<_>>();
55        if entries.is_empty() {
56            return Ok(Vec::new());
57        }
58
59        let bytes = serde_json::to_vec(&entries)?.len();
60        self.pending_entries += entries.len();
61        self.pending_bytes += bytes;
62        self.pending.push(PendingMirrorFrame {
63            file_path: PathBuf::from(file_path),
64            entries,
65        });
66
67        if self.flush_mode == SessionStoreFlushMode::Eager
68            || self.pending_entries > MAX_PENDING_ENTRIES
69            || self.pending_bytes > MAX_PENDING_BYTES
70        {
71            return self.flush().await;
72        }
73
74        Ok(Vec::new())
75    }
76
77    pub async fn flush(&mut self) -> Result<Vec<Message>> {
78        if self.pending.is_empty() {
79            return Ok(std::mem::take(&mut self.mirror_errors));
80        }
81
82        let frames = std::mem::take(&mut self.pending);
83        self.pending_entries = 0;
84        self.pending_bytes = 0;
85
86        let mut by_path = BTreeMap::<PathBuf, Vec<SessionStoreEntry>>::new();
87        for frame in frames {
88            by_path
89                .entry(frame.file_path)
90                .or_default()
91                .extend(frame.entries);
92        }
93
94        for (file_path, entries) in by_path {
95            let Some(key) = file_path_to_session_key(&file_path, &self.projects_dir) else {
96                continue;
97            };
98            if let Err(error) = self.store.append(key.clone(), entries).await {
99                self.mirror_errors.push(mirror_error_message(
100                    Some(file_path.to_string_lossy().to_string()),
101                    Some(key),
102                    error.to_string(),
103                ));
104            }
105        }
106
107        Ok(std::mem::take(&mut self.mirror_errors))
108    }
109
110    pub fn pending_bytes(&self) -> usize {
111        self.pending_bytes
112    }
113
114    pub fn pending_entries(&self) -> usize {
115        self.pending_entries
116    }
117}
118
119fn projects_dir_for_options(options: &ClaudeAgentOptions) -> PathBuf {
120    options
121        .env
122        .get("CLAUDE_CONFIG_DIR")
123        .map(PathBuf::from)
124        .or_else(|| dirs::home_dir().map(|home| home.join(".claude")))
125        .unwrap_or_else(|| PathBuf::from(".claude"))
126        .join("projects")
127}
128
129pub fn mirror_error_message(
130    file_path: Option<String>,
131    key: Option<crate::session_store::SessionKey>,
132    error: impl Into<String>,
133) -> Message {
134    let mut data = serde_json::Map::new();
135    let error = error.into();
136    data.insert(
137        "error".to_string(),
138        serde_json::Value::String(error.clone()),
139    );
140    if let Some(file_path) = file_path {
141        data.insert("filePath".to_string(), serde_json::Value::String(file_path));
142    }
143    let key = key.map(|key| {
144        let mut value = serde_json::Map::new();
145        value.insert(
146            "project_key".to_string(),
147            serde_json::json!(key.project_key),
148        );
149        value.insert("session_id".to_string(), serde_json::json!(key.session_id));
150        if let Some(subpath) = key.subpath {
151            value.insert("subpath".to_string(), serde_json::json!(subpath));
152        }
153        value
154    });
155    if let Some(key) = &key {
156        data.insert("key".to_string(), serde_json::Value::Object(key.clone()));
157    }
158    Message::MirrorErrorMsg(crate::types::MirrorErrorMessage { key, error, data })
159}
160
161#[allow(dead_code)]
162fn _keep_session_summary_entry_public(_: Option<SessionSummaryEntry>) {}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use crate::session_store::{InMemorySessionStore, SessionKey, SessionStore};
168    use async_trait::async_trait;
169
170    struct FailingStore;
171
172    #[async_trait]
173    impl SessionStore for FailingStore {
174        async fn append(&self, _key: SessionKey, _entries: Vec<SessionStoreEntry>) -> Result<()> {
175            Err(crate::error::ClaudeSDKError::Session(
176                "store failed".to_string(),
177            ))
178        }
179
180        async fn load(&self, _key: SessionKey) -> Result<Option<Vec<SessionStoreEntry>>> {
181            Ok(None)
182        }
183    }
184
185    fn mirror_frame(projects_dir: &std::path::Path, uuid: &str) -> serde_json::Value {
186        serde_json::json!({
187            "type": "transcript_mirror",
188            "filePath": projects_dir.join("proj/session-1.jsonl"),
189            "entries": [{
190                "type": "user",
191                "uuid": uuid,
192                "message": {"content": format!("prompt {uuid}")}
193            }]
194        })
195    }
196
197    #[tokio::test]
198    async fn batcher_coalesces_and_flushes_by_session_key() {
199        let store = InMemorySessionStore::new();
200        let temp =
201            std::env::temp_dir().join(format!("claude-rust-mirror-test-{}", uuid::Uuid::new_v4()));
202        let projects_dir = temp.join("projects");
203        let mut env = std::collections::HashMap::new();
204        env.insert(
205            "CLAUDE_CONFIG_DIR".to_string(),
206            temp.to_string_lossy().to_string(),
207        );
208        let options = ClaudeAgentOptions::builder()
209            .env(env)
210            .session_store(store.clone())
211            .build();
212        let mut batcher = TranscriptMirrorBatcher::from_options(&options).expect("batcher");
213
214        batcher
215            .enqueue_value(&mirror_frame(&projects_dir, "1"))
216            .await
217            .unwrap();
218        batcher
219            .enqueue_value(&mirror_frame(&projects_dir, "2"))
220            .await
221            .unwrap();
222        assert_eq!(batcher.pending_entries(), 2);
223
224        let errors = batcher.flush().await.unwrap();
225        assert!(errors.is_empty());
226
227        let entries = store
228            .load(SessionKey {
229                project_key: "proj".to_string(),
230                session_id: "session-1".to_string(),
231                subpath: None,
232            })
233            .await
234            .unwrap()
235            .unwrap();
236        assert_eq!(entries.len(), 2);
237        assert_eq!(entries[0]["uuid"], "1");
238        assert_eq!(entries[1]["uuid"], "2");
239    }
240
241    #[tokio::test]
242    async fn batcher_reports_store_failures_as_nonfatal_mirror_errors() {
243        let temp = std::env::temp_dir().join(format!(
244            "claude-rust-mirror-error-test-{}",
245            uuid::Uuid::new_v4()
246        ));
247        let projects_dir = temp.join("projects");
248        let mut env = std::collections::HashMap::new();
249        env.insert(
250            "CLAUDE_CONFIG_DIR".to_string(),
251            temp.to_string_lossy().to_string(),
252        );
253        let options = ClaudeAgentOptions::builder()
254            .env(env)
255            .session_store(FailingStore)
256            .build();
257        let mut batcher = TranscriptMirrorBatcher::from_options(&options).expect("batcher");
258
259        batcher
260            .enqueue_value(&mirror_frame(&projects_dir, "1"))
261            .await
262            .unwrap();
263        let errors = batcher.flush().await.unwrap();
264
265        assert_eq!(errors.len(), 1);
266        match &errors[0] {
267            Message::MirrorErrorMsg(message) => {
268                assert!(message.error.contains("store failed"));
269                assert_eq!(
270                    message.key.as_ref().and_then(|key| key.get("project_key")),
271                    Some(&serde_json::json!("proj"))
272                );
273            }
274            other => panic!("expected mirror_error system message, got {other:?}"),
275        }
276    }
277}