claude_code_sdk_rust/internal/
transcript_mirror.rs1use std::collections::BTreeMap;
2use std::path::PathBuf;
3
4use crate::error::Result;
5use crate::session_store::{
6 file_path_to_session_key, SessionStoreEntry, SessionStoreHandle, SessionSummaryEntry,
7};
8use crate::types::{ClaudeAgentOptions, Message, SessionStoreFlushMode};
9
10const MAX_PENDING_ENTRIES: usize = 500;
11const MAX_PENDING_BYTES: usize = 1 << 20;
12
13#[derive(Debug, Clone)]
14struct PendingMirrorFrame {
15 file_path: PathBuf,
16 entries: Vec<SessionStoreEntry>,
17}
18
19#[derive(Debug, Clone)]
20pub struct TranscriptMirrorBatcher {
21 store: SessionStoreHandle,
22 projects_dir: PathBuf,
23 pending: Vec<PendingMirrorFrame>,
24 pending_entries: usize,
25 pending_bytes: usize,
26 flush_mode: SessionStoreFlushMode,
27 mirror_errors: Vec<Message>,
28}
29
30impl TranscriptMirrorBatcher {
31 pub fn from_options(options: &ClaudeAgentOptions) -> Option<Self> {
32 let store = options.session_store.clone()?;
33 Some(Self {
34 store,
35 projects_dir: projects_dir_for_options(options),
36 pending: Vec::new(),
37 pending_entries: 0,
38 pending_bytes: 0,
39 flush_mode: options.session_store_flush,
40 mirror_errors: Vec::new(),
41 })
42 }
43
44 pub async fn enqueue_value(&mut self, value: &serde_json::Value) -> Result<Vec<Message>> {
45 let Some(file_path) = value.get("filePath").and_then(|v| v.as_str()) else {
46 return Ok(Vec::new());
47 };
48 let entries = value
49 .get("entries")
50 .and_then(|v| v.as_array())
51 .into_iter()
52 .flatten()
53 .filter_map(|entry| entry.as_object().cloned())
54 .collect::<Vec<_>>();
55 if entries.is_empty() {
56 return Ok(Vec::new());
57 }
58
59 let bytes = serde_json::to_vec(&entries)?.len();
60 self.pending_entries += entries.len();
61 self.pending_bytes += bytes;
62 self.pending.push(PendingMirrorFrame {
63 file_path: PathBuf::from(file_path),
64 entries,
65 });
66
67 if self.flush_mode == SessionStoreFlushMode::Eager
68 || self.pending_entries > MAX_PENDING_ENTRIES
69 || self.pending_bytes > MAX_PENDING_BYTES
70 {
71 return self.flush().await;
72 }
73
74 Ok(Vec::new())
75 }
76
77 pub async fn flush(&mut self) -> Result<Vec<Message>> {
78 if self.pending.is_empty() {
79 return Ok(std::mem::take(&mut self.mirror_errors));
80 }
81
82 let frames = std::mem::take(&mut self.pending);
83 self.pending_entries = 0;
84 self.pending_bytes = 0;
85
86 let mut by_path = BTreeMap::<PathBuf, Vec<SessionStoreEntry>>::new();
87 for frame in frames {
88 by_path
89 .entry(frame.file_path)
90 .or_default()
91 .extend(frame.entries);
92 }
93
94 for (file_path, entries) in by_path {
95 let Some(key) = file_path_to_session_key(&file_path, &self.projects_dir) else {
96 continue;
97 };
98 if let Err(error) = self.store.append(key.clone(), entries).await {
99 self.mirror_errors.push(mirror_error_message(
100 Some(file_path.to_string_lossy().to_string()),
101 Some(key),
102 error.to_string(),
103 ));
104 }
105 }
106
107 Ok(std::mem::take(&mut self.mirror_errors))
108 }
109
110 pub fn pending_bytes(&self) -> usize {
111 self.pending_bytes
112 }
113
114 pub fn pending_entries(&self) -> usize {
115 self.pending_entries
116 }
117}
118
119fn projects_dir_for_options(options: &ClaudeAgentOptions) -> PathBuf {
120 options
121 .env
122 .get("CLAUDE_CONFIG_DIR")
123 .map(PathBuf::from)
124 .or_else(|| dirs::home_dir().map(|home| home.join(".claude")))
125 .unwrap_or_else(|| PathBuf::from(".claude"))
126 .join("projects")
127}
128
129pub fn mirror_error_message(
130 file_path: Option<String>,
131 key: Option<crate::session_store::SessionKey>,
132 error: impl Into<String>,
133) -> Message {
134 let mut data = serde_json::Map::new();
135 let error = error.into();
136 data.insert(
137 "error".to_string(),
138 serde_json::Value::String(error.clone()),
139 );
140 if let Some(file_path) = file_path {
141 data.insert("filePath".to_string(), serde_json::Value::String(file_path));
142 }
143 let key = key.map(|key| {
144 let mut value = serde_json::Map::new();
145 value.insert(
146 "project_key".to_string(),
147 serde_json::json!(key.project_key),
148 );
149 value.insert("session_id".to_string(), serde_json::json!(key.session_id));
150 if let Some(subpath) = key.subpath {
151 value.insert("subpath".to_string(), serde_json::json!(subpath));
152 }
153 value
154 });
155 if let Some(key) = &key {
156 data.insert("key".to_string(), serde_json::Value::Object(key.clone()));
157 }
158 Message::MirrorErrorMsg(crate::types::MirrorErrorMessage { key, error, data })
159}
160
161#[allow(dead_code)]
162fn _keep_session_summary_entry_public(_: Option<SessionSummaryEntry>) {}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use crate::session_store::{InMemorySessionStore, SessionKey, SessionStore};
168 use async_trait::async_trait;
169
170 struct FailingStore;
171
172 #[async_trait]
173 impl SessionStore for FailingStore {
174 async fn append(&self, _key: SessionKey, _entries: Vec<SessionStoreEntry>) -> Result<()> {
175 Err(crate::error::ClaudeSDKError::Session(
176 "store failed".to_string(),
177 ))
178 }
179
180 async fn load(&self, _key: SessionKey) -> Result<Option<Vec<SessionStoreEntry>>> {
181 Ok(None)
182 }
183 }
184
185 fn mirror_frame(projects_dir: &std::path::Path, uuid: &str) -> serde_json::Value {
186 serde_json::json!({
187 "type": "transcript_mirror",
188 "filePath": projects_dir.join("proj/session-1.jsonl"),
189 "entries": [{
190 "type": "user",
191 "uuid": uuid,
192 "message": {"content": format!("prompt {uuid}")}
193 }]
194 })
195 }
196
197 #[tokio::test]
198 async fn batcher_coalesces_and_flushes_by_session_key() {
199 let store = InMemorySessionStore::new();
200 let temp =
201 std::env::temp_dir().join(format!("claude-rust-mirror-test-{}", uuid::Uuid::new_v4()));
202 let projects_dir = temp.join("projects");
203 let mut env = std::collections::HashMap::new();
204 env.insert(
205 "CLAUDE_CONFIG_DIR".to_string(),
206 temp.to_string_lossy().to_string(),
207 );
208 let options = ClaudeAgentOptions::builder()
209 .env(env)
210 .session_store(store.clone())
211 .build();
212 let mut batcher = TranscriptMirrorBatcher::from_options(&options).expect("batcher");
213
214 batcher
215 .enqueue_value(&mirror_frame(&projects_dir, "1"))
216 .await
217 .unwrap();
218 batcher
219 .enqueue_value(&mirror_frame(&projects_dir, "2"))
220 .await
221 .unwrap();
222 assert_eq!(batcher.pending_entries(), 2);
223
224 let errors = batcher.flush().await.unwrap();
225 assert!(errors.is_empty());
226
227 let entries = store
228 .load(SessionKey {
229 project_key: "proj".to_string(),
230 session_id: "session-1".to_string(),
231 subpath: None,
232 })
233 .await
234 .unwrap()
235 .unwrap();
236 assert_eq!(entries.len(), 2);
237 assert_eq!(entries[0]["uuid"], "1");
238 assert_eq!(entries[1]["uuid"], "2");
239 }
240
241 #[tokio::test]
242 async fn batcher_reports_store_failures_as_nonfatal_mirror_errors() {
243 let temp = std::env::temp_dir().join(format!(
244 "claude-rust-mirror-error-test-{}",
245 uuid::Uuid::new_v4()
246 ));
247 let projects_dir = temp.join("projects");
248 let mut env = std::collections::HashMap::new();
249 env.insert(
250 "CLAUDE_CONFIG_DIR".to_string(),
251 temp.to_string_lossy().to_string(),
252 );
253 let options = ClaudeAgentOptions::builder()
254 .env(env)
255 .session_store(FailingStore)
256 .build();
257 let mut batcher = TranscriptMirrorBatcher::from_options(&options).expect("batcher");
258
259 batcher
260 .enqueue_value(&mirror_frame(&projects_dir, "1"))
261 .await
262 .unwrap();
263 let errors = batcher.flush().await.unwrap();
264
265 assert_eq!(errors.len(), 1);
266 match &errors[0] {
267 Message::MirrorErrorMsg(message) => {
268 assert!(message.error.contains("store failed"));
269 assert_eq!(
270 message.key.as_ref().and_then(|key| key.get("project_key")),
271 Some(&serde_json::json!("proj"))
272 );
273 }
274 other => panic!("expected mirror_error system message, got {other:?}"),
275 }
276 }
277}