1use crate::constants::env::system;
3use crate::types::Message;
4use serde::{Deserialize, Serialize};
5use std::io::{Read, Write};
6use std::path::PathBuf;
7use tokio::fs;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SessionMetadata {
12 pub id: String,
13 pub cwd: String,
14 pub model: String,
15 #[serde(rename = "createdAt")]
16 pub created_at: String,
17 #[serde(rename = "updatedAt")]
18 pub updated_at: String,
19 #[serde(rename = "messageCount")]
20 pub message_count: u32,
21 pub summary: Option<String>,
22 pub tag: Option<String>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SessionData {
28 pub metadata: SessionMetadata,
29 pub messages: Vec<Message>,
30}
31
32pub fn get_sessions_dir() -> PathBuf {
34 let home = std::env::var(system::HOME)
35 .or_else(|_| std::env::var(system::USERPROFILE))
36 .unwrap_or_else(|_| "/tmp".to_string());
37 PathBuf::from(home).join(".open-agent-sdk").join("sessions")
38}
39
40pub fn get_session_path(session_id: &str) -> PathBuf {
42 get_sessions_dir().join(session_id)
43}
44
45pub async fn save_session(
47 session_id: &str,
48 messages: Vec<Message>,
49 metadata: Option<SessionMetadata>,
50) -> Result<(), crate::error::AgentError> {
51 let dir = get_session_path(session_id);
52 fs::create_dir_all(&dir)
53 .await
54 .map_err(crate::error::AgentError::Io)?;
55
56 let cwd = metadata
57 .as_ref()
58 .and_then(|m| Some(m.cwd.clone()))
59 .unwrap_or_else(|| {
60 std::env::current_dir()
61 .unwrap_or_default()
62 .to_string_lossy()
63 .to_string()
64 });
65
66 let model = metadata
67 .as_ref()
68 .and_then(|m| Some(m.model.clone()))
69 .unwrap_or_else(|| "claude-sonnet-4-6".to_string());
70
71 let created_at = metadata
72 .as_ref()
73 .and_then(|m| Some(m.created_at.clone()))
74 .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
75
76 let summary = metadata.as_ref().and_then(|m| m.summary.clone());
77 let tag = metadata.as_ref().and_then(|m| m.tag.clone());
78
79 let data = SessionData {
80 metadata: SessionMetadata {
81 id: session_id.to_string(),
82 cwd,
83 model,
84 created_at: created_at.clone(),
85 updated_at: chrono::Utc::now().to_rfc3339(),
86 message_count: messages.len() as u32,
87 summary,
88 tag,
89 },
90 messages,
91 };
92
93 let path = dir.join("transcript.json");
94 let json = serde_json::to_string_pretty(&data).map_err(crate::error::AgentError::Json)?;
95 fs::write(&path, json)
96 .await
97 .map_err(crate::error::AgentError::Io)?;
98
99 Ok(())
100}
101
102pub async fn load_session(
104 session_id: &str,
105) -> Result<Option<SessionData>, crate::error::AgentError> {
106 let path = get_session_path(session_id).join("transcript.json");
107
108 match fs::read_to_string(&path).await {
109 Ok(content) => {
110 let data: SessionData =
111 serde_json::from_str(&content).map_err(crate::error::AgentError::Json)?;
112 Ok(Some(data))
113 }
114 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
115 Err(e) => Err(crate::error::AgentError::Io(e)),
116 }
117}
118
119pub async fn list_sessions() -> Result<Vec<SessionMetadata>, crate::error::AgentError> {
121 let dir = get_sessions_dir();
122
123 let mut entries = match fs::read_dir(&dir).await {
124 Ok(entries) => entries,
125 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(vec![]),
126 Err(e) => return Err(crate::error::AgentError::Io(e)),
127 };
128
129 let mut sessions = Vec::new();
130
131 while let Some(entry) = entries
132 .next_entry()
133 .await
134 .map_err(crate::error::AgentError::Io)?
135 {
136 let entry_id = entry.file_name().to_string_lossy().to_string();
137 if let Ok(Some(data)) = load_session(&entry_id).await {
138 if let Some(metadata) = Some(data.metadata) {
139 sessions.push(metadata);
140 }
141 }
142 }
143
144 sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
146
147 Ok(sessions)
148}
149
150pub async fn fork_session(
152 source_session_id: &str,
153 new_session_id: Option<String>,
154) -> Result<Option<String>, crate::error::AgentError> {
155 let data = match load_session(source_session_id).await? {
156 Some(d) => d,
157 None => return Ok(None),
158 };
159
160 let fork_id = new_session_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
161
162 save_session(
163 &fork_id,
164 data.messages,
165 Some(SessionMetadata {
166 id: fork_id.clone(),
167 cwd: data.metadata.cwd,
168 model: data.metadata.model,
169 created_at: chrono::Utc::now().to_rfc3339(),
170 updated_at: chrono::Utc::now().to_rfc3339(),
171 message_count: data.metadata.message_count,
172 summary: Some(format!("Forked from session {}", source_session_id)),
173 tag: None,
174 }),
175 )
176 .await?;
177
178 Ok(Some(fork_id))
179}
180
181pub async fn get_session_messages(
183 session_id: &str,
184) -> Result<Vec<Message>, crate::error::AgentError> {
185 match load_session(session_id).await? {
186 Some(data) => Ok(data.messages),
187 None => Ok(vec![]),
188 }
189}
190
191pub async fn append_to_session(
193 session_id: &str,
194 message: Message,
195) -> Result<(), crate::error::AgentError> {
196 let mut data = match load_session(session_id).await? {
197 Some(d) => d,
198 None => return Ok(()),
199 };
200
201 data.messages.push(message);
202 data.metadata.updated_at = chrono::Utc::now().to_rfc3339();
203 data.metadata.message_count = data.messages.len() as u32;
204
205 save_session(session_id, data.messages, Some(data.metadata)).await
206}
207
208pub async fn delete_session(session_id: &str) -> Result<bool, crate::error::AgentError> {
210 let path = get_session_path(session_id);
211
212 match fs::remove_dir_all(&path).await {
213 Ok(_) => Ok(true),
214 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
215 Err(e) => Err(crate::error::AgentError::Io(e)),
216 }
217}
218
219pub async fn get_session_info(
221 session_id: &str,
222) -> Result<Option<SessionMetadata>, crate::error::AgentError> {
223 match load_session(session_id).await? {
224 Some(data) => Ok(Some(data.metadata)),
225 None => Ok(None),
226 }
227}
228
229pub async fn rename_session(session_id: &str, title: &str) -> Result<(), crate::error::AgentError> {
231 let mut data = match load_session(session_id).await? {
232 Some(d) => d,
233 None => return Ok(()),
234 };
235
236 data.metadata.summary = Some(title.to_string());
237 data.metadata.updated_at = chrono::Utc::now().to_rfc3339();
238
239 save_session(session_id, data.messages, Some(data.metadata)).await
240}
241
242pub async fn tag_session(
244 session_id: &str,
245 tag: Option<&str>,
246) -> Result<(), crate::error::AgentError> {
247 let mut data = match load_session(session_id).await? {
248 Some(d) => d,
249 None => return Ok(()),
250 };
251
252 data.metadata.tag = tag.map(|s| s.to_string());
253 data.metadata.updated_at = chrono::Utc::now().to_rfc3339();
254
255 save_session(session_id, data.messages, Some(data.metadata)).await
256}
257
258#[derive(Debug, Clone, Default)]
260pub struct ResumeConfig {
261 pub max_tail_messages: Option<usize>,
263 pub tail_uuid: Option<String>,
266}
267
268#[derive(Debug, Clone)]
270pub struct ResumeResult {
271 pub messages: Vec<Message>,
273 pub metadata: Option<SessionMetadata>,
275 pub dropped_count: usize,
277}
278
279pub async fn resume_session(
289 session_id: &str,
290 config: &ResumeConfig,
291) -> Result<ResumeResult, crate::error::AgentError> {
292 let data = match load_session(session_id).await? {
293 Some(d) => d,
294 None => {
295 return Ok(ResumeResult {
296 messages: vec![],
297 metadata: None,
298 dropped_count: 0,
299 })
300 }
301 };
302
303 let mut messages = data.messages;
304 let mut dropped = 0;
305
306 if let Some(ref tail_uuid) = config.tail_uuid {
308 if let Some(idx) = messages.iter().position(|m| is_message_uuid(m, tail_uuid)) {
310 let after_tail = messages.drain(idx + 1..).collect::<Vec<_>>();
311 dropped += messages.len();
312 messages = after_tail;
313 }
314 }
316
317 if let Some(max_tail) = config.max_tail_messages {
319 if messages.len() > max_tail {
320 let dropped_tail = messages.len() - max_tail;
321 messages.drain(..dropped_tail);
322 dropped += dropped_tail;
323 }
324 }
325
326 let before_dedup = messages.len();
328 messages = deduplicate_messages(messages);
329 dropped += before_dedup - messages.len();
330
331 Ok(ResumeResult {
332 messages,
333 metadata: Some(data.metadata),
334 dropped_count: dropped,
335 })
336}
337
338fn is_message_uuid(msg: &Message, uuid: &str) -> bool {
342 if let Some(ref tool_call_id) = msg.tool_call_id {
344 if tool_call_id == uuid {
345 return true;
346 }
347 }
348 let content_hash = format!("{:x}", md5_hash(&msg.content));
350 content_hash == uuid
351}
352
353fn md5_hash(content: &str) -> u64 {
355 let mut hash: u64 = 5381;
356 for b in content.bytes() {
357 hash = hash.wrapping_mul(33).wrapping_add(b as u64);
358 }
359 hash
360}
361
362fn deduplicate_messages(messages: Vec<Message>) -> Vec<Message> {
365 let mut seen = std::collections::HashSet::new();
366 let mut result = Vec::with_capacity(messages.len());
367 for msg in messages {
368 let key = (msg.role.clone(), msg.content.clone());
369 if seen.insert(key) {
370 result.push(msg);
371 }
372 }
373 result
374}
375
376pub fn create_preserved_segment(
381 messages: &[Message],
382 max_tokens: u32,
383 tail_count: usize,
384) -> Vec<Message> {
385 let tail = &messages[messages.len().saturating_sub(tail_count)..];
386 let mut tokens = 0;
387 let mut result = Vec::new();
388
389 for msg in tail.iter().rev() {
390 let msg_tokens = crate::compact::rough_token_count_estimation_for_content(&msg.content);
391 if tokens + msg_tokens > max_tokens as usize {
392 break;
393 }
394 tokens += msg_tokens;
395 result.push(msg.clone());
396 }
397
398 result.reverse();
400 result
401}
402
403use crate::cli_ndjson_safe_stringify::serialize_to_ndjson;
414use std::collections::HashMap;
415use tokio::io::AsyncWriteExt;
416use std::sync::LazyLock;
417use tokio::time;
418
419#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct SessionEntry {
422 #[serde(skip_serializing_if = "Option::is_none")]
423 #[serde(rename = "t")]
424 pub timestamp: Option<String>,
425 #[serde(skip_serializing_if = "Option::is_none")]
426 #[serde(rename = "type")]
427 pub entry_type: Option<String>,
428 #[serde(skip_serializing_if = "Option::is_none")]
429 #[serde(rename = "d")]
430 pub data: Option<serde_json::Value>,
431}
432
433impl SessionEntry {
434 pub fn message(message: &Message) -> Self {
435 Self {
436 timestamp: Some(chrono::Utc::now().to_rfc3339()),
437 entry_type: Some("message".to_string()),
438 data: Some(serde_json::to_value(message).unwrap_or(serde_json::Value::Null)),
439 }
440 }
441
442 pub fn metadata(metadata: &SessionMetadata) -> Self {
443 Self {
444 timestamp: Some(chrono::Utc::now().to_rfc3339()),
445 entry_type: Some("metadata".to_string()),
446 data: Some(
447 serde_json::to_value(metadata).unwrap_or(serde_json::Value::Null),
448 ),
449 }
450 }
451
452 pub fn sidechain_message(message: &Message, agent_id: &str, parent_uuid: Option<&str>) -> Self {
454 let mut data_obj = serde_json::to_value(message).unwrap_or(serde_json::Value::Null);
455 if let Some(obj) = data_obj.as_object_mut() {
456 obj.insert("agentId".to_string(), serde_json::json!(agent_id));
457 obj.insert("isSidechain".to_string(), serde_json::json!(true));
458 if let Some(uuid) = parent_uuid {
459 obj.insert("parentUuid".to_string(), serde_json::json!(uuid));
460 }
461 }
462 Self {
463 timestamp: Some(chrono::Utc::now().to_rfc3339()),
464 entry_type: Some("message".to_string()),
465 data: Some(data_obj),
466 }
467 }
468}
469
470pub fn get_sidechain_jsonl_path(session_id: &str, agent_id: &str) -> PathBuf {
473 get_session_path(session_id)
474 .join("sidechains")
475 .join(format!("{}.jsonl", agent_id))
476}
477
478pub async fn record_sidechain_transcript(
483 session_id: &str,
484 messages: &[Message],
485 agent_id: &str,
486 starting_parent_uuid: Option<String>,
487) -> Result<(), crate::error::AgentError> {
488 let mut current_parent_uuid = starting_parent_uuid;
489
490 for message in messages {
491 let entry =
492 SessionEntry::sidechain_message(message, agent_id, current_parent_uuid.as_deref());
493
494 let path = get_sidechain_jsonl_path(session_id, agent_id);
495 let line =
496 crate::cli_ndjson_safe_stringify::serialize_to_ndjson(&entry)
497 .map_err(crate::error::AgentError::Json)?;
498
499 tokio::task::spawn_blocking(move || -> std::result::Result<(), crate::error::AgentError> {
501 std::fs::create_dir_all(path.parent().unwrap())
502 .map_err(crate::error::AgentError::Io)?;
503 let _guard = SESSION_WRITE_LOCK.lock().unwrap();
504 let mut file = std::fs::OpenOptions::new()
505 .create(true)
506 .append(true)
507 .open(&path)
508 .map_err(crate::error::AgentError::Io)?;
509 file.write_all(format!("{line}\n").as_bytes())
510 .map_err(crate::error::AgentError::Io)?;
511 Ok(())
512 })
513 .await
514 .map_err(|_| crate::error::AgentError::Io(std::io::Error::new(
515 std::io::ErrorKind::Other,
516 "task joined",
517 )))??;
518
519 current_parent_uuid = Some(uuid::Uuid::new_v4().to_string());
521 }
522
523 Ok(())
524}
525
526pub async fn insert_message_chain(
532 session_id: &str,
533 messages: &[Message],
534 is_sidechain: bool,
535 agent_id: Option<String>,
536 starting_parent_uuid: Option<String>,
537) -> Result<(), crate::error::AgentError> {
538 if is_sidechain {
539 let aid = agent_id.unwrap_or_else(|| "default".to_string());
540 record_sidechain_transcript(session_id, messages, &aid, starting_parent_uuid).await
541 } else {
542 for message in messages {
543 append_session_message(session_id, message).await?;
544 }
545 Ok(())
546 }
547}
548
549pub fn get_jsonl_path(session_id: &str) -> PathBuf {
551 get_session_path(session_id).join(format!("{session_id}.jsonl"))
552}
553
554pub async fn append_session_entry(
559 session_id: &str,
560 entry: &SessionEntry,
561) -> Result<(), crate::error::AgentError> {
562 let path = get_jsonl_path(session_id);
563 fs::create_dir_all(path.parent().unwrap())
564 .await
565 .map_err(crate::error::AgentError::Io)?;
566
567 let line = serialize_to_ndjson(entry).map_err(crate::error::AgentError::Json)?;
568 tokio::task::spawn_blocking(move || -> std::result::Result<(), crate::error::AgentError> {
571 let _guard = SESSION_WRITE_LOCK.lock().unwrap();
572 std::fs::create_dir_all(path.parent().unwrap())
573 .map_err(crate::error::AgentError::Io)?;
574 let mut file = std::fs::OpenOptions::new()
575 .create(true)
576 .append(true)
577 .open(&path)
578 .map_err(crate::error::AgentError::Io)?;
579 file.write_all(format!("{line}\n").as_bytes())
580 .map_err(crate::error::AgentError::Io)?;
581 Ok(())
582 })
583 .await
584 .map_err(|_| crate::error::AgentError::Io(std::io::Error::new(
585 std::io::ErrorKind::Other,
586 "task joined",
587 )))??;
588 Ok(())
589}
590
591pub async fn append_session_message(
595 session_id: &str,
596 message: &Message,
597) -> Result<(), crate::error::AgentError> {
598 let entry = SessionEntry::message(message);
599 append_session_entry(session_id, &entry).await
600}
601
602pub async fn load_session_jsonl(
607 session_id: &str,
608) -> Result<Option<SessionData>, crate::error::AgentError> {
609 let path = get_jsonl_path(session_id);
610 match fs::read_to_string(&path).await {
611 Ok(content) => {
612 let mut messages = Vec::new();
613 let mut metadata: Option<SessionMetadata> = None;
614
615 for line in content.lines() {
616 let line = line.trim().to_string();
617 if line.is_empty() {
618 continue;
619 }
620 let entry: SessionEntry =
621 serde_json::from_str(&line).map_err(crate::error::AgentError::Json)?;
622 if entry.entry_type.as_deref() == Some("message") {
623 if let Some(data) = &entry.data {
624 let msg: Message =
625 serde_json::from_value(data.clone()).map_err(crate::error::AgentError::Json)?;
626 messages.push(msg);
627 }
628 } else if entry.entry_type.as_deref() == Some("metadata") {
629 if let Some(data) = &entry.data {
630 metadata =
631 Some(serde_json::from_value(data.clone()).map_err(crate::error::AgentError::Json)?);
632 }
633 }
634 }
635
636 if messages.is_empty() && metadata.is_none() {
637 return Ok(None);
638 }
639
640 let final_metadata = metadata.unwrap_or_else(|| SessionMetadata {
641 id: session_id.to_string(),
642 cwd: std::env::current_dir()
643 .unwrap_or_default()
644 .to_string_lossy()
645 .to_string(),
646 model: "claude-sonnet-4-6".to_string(),
647 created_at: chrono::Utc::now().to_rfc3339(),
648 updated_at: chrono::Utc::now().to_rfc3339(),
649 message_count: messages.len() as u32,
650 summary: None,
651 tag: None,
652 });
653
654 Ok(Some(SessionData {
655 metadata: final_metadata,
656 messages,
657 }))
658 }
659 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
660 Err(e) => Err(crate::error::AgentError::Io(e)),
661 }
662}
663
664static SESSION_PENDING: LazyLock<std::sync::Mutex<HashMap<String, Vec<String>>>> =
666 LazyLock::new(|| std::sync::Mutex::new(HashMap::new()));
667
668static SESSION_DRAINING: LazyLock<std::sync::Mutex<bool>> =
670 LazyLock::new(|| std::sync::Mutex::new(false));
671
672static SESSION_RESET_REQUESTED: LazyLock<std::sync::Mutex<bool>> =
674 LazyLock::new(|| std::sync::Mutex::new(false));
675
676static SESSION_DRAIN_PAUSED: LazyLock<std::sync::Mutex<bool>> =
679 LazyLock::new(|| std::sync::Mutex::new(false));
680
681static SESSION_WRITE_LOCK: LazyLock<std::sync::Mutex<()>> =
687 LazyLock::new(|| std::sync::Mutex::new(()));
688
689
690const SESSION_FLUSH_INTERVAL_MS: u64 = 100;
692
693pub struct SessionWriter;
694
695impl SessionWriter {
696 pub fn enqueue(session_id: &str, line: String) {
699 {
700 let mut pending = SESSION_PENDING.lock().unwrap();
701 pending
702 .entry(session_id.to_string())
703 .or_default()
704 .push(line);
705 }
706
707 {
709 let paused = *SESSION_DRAIN_PAUSED.lock().unwrap();
710 if paused {
711 return;
712 }
713 let mut draining = SESSION_DRAINING.lock().unwrap();
714 if *draining {
715 return;
716 }
717 *draining = true;
718 }
719 tokio::spawn(Self::drain_loop());
720 }
721
722 async fn drain_loop() {
726 let mut ticks = 0u32;
727 loop {
728 time::sleep(time::Duration::from_millis(10)).await;
729 ticks += 1;
730 if *SESSION_DRAIN_PAUSED.lock().unwrap() {
732 *SESSION_DRAINING.lock().unwrap() = false;
733 return;
734 }
735 if *SESSION_RESET_REQUESTED.lock().unwrap() {
737 Self::drain().await;
738 *SESSION_DRAINING.lock().unwrap() = false;
739 return;
740 }
741 if ticks % ((SESSION_FLUSH_INTERVAL_MS / 10) as u32) == 0 {
743 if Self::drain().await {
744 *SESSION_DRAINING.lock().unwrap() = false;
745 break;
746 }
747 }
748 }
749 }
750
751 pub async fn drain() -> bool {
753 if *SESSION_RESET_REQUESTED.lock().unwrap() {
757 return false;
758 }
759
760 let to_drain = {
761 let mut pending = SESSION_PENDING.lock().unwrap();
762 let mut batch = HashMap::new();
763 for (session_id, lines) in pending.iter_mut() {
764 if !lines.is_empty() {
765 batch.insert(session_id.clone(), lines.clone());
766 lines.clear();
767 }
768 }
769 batch
770 };
771
772 if to_drain.is_empty() {
773 return SESSION_PENDING.lock().unwrap().is_empty();
774 }
775
776 if *SESSION_RESET_REQUESTED.lock().unwrap() {
778 return false;
779 }
780
781 tokio::task::spawn_blocking(move || {
785 let _guard = SESSION_WRITE_LOCK.lock().unwrap();
786 for (session_id, lines) in to_drain {
787 let path = get_jsonl_path(&session_id);
788 let content: String = lines.join("\n");
789 let _ = std::fs::create_dir_all(path.parent().unwrap());
790 if let Ok(mut file) = std::fs::OpenOptions::new()
791 .create(true)
792 .append(true)
793 .open(&path)
794 {
795 let _ = file.write_all(format!("{content}\n").as_bytes());
796 }
797 }
798 })
799 .await
800 .ok();
801
802 SESSION_PENDING.lock().unwrap().is_empty()
803 }
804
805 pub async fn flush(_session_id: &str) {
807 Self::drain().await;
808 }
809}
810
811pub fn reset_session_globals_for_testing() {
813 *SESSION_DRAIN_PAUSED.lock().unwrap() = true;
815 *SESSION_RESET_REQUESTED.lock().unwrap() = true;
817 let start = std::time::Instant::now();
820 while start.elapsed() < std::time::Duration::from_millis(500) {
821 if !*SESSION_DRAINING.lock().unwrap() {
822 break;
823 }
824 std::thread::sleep(std::time::Duration::from_millis(20));
825 }
826 *SESSION_DRAINING.lock().unwrap() = false;
829 *SESSION_RESET_REQUESTED.lock().unwrap() = false;
830}
831
832pub fn enqueue_session_message(session_id: &str, message: &Message) {
837 let line = serialize_to_ndjson(&SessionEntry::message(message))
838 .unwrap_or_default();
839 SessionWriter::enqueue(session_id, line);
840}
841
842pub fn enqueue_session_metadata(session_id: &str, metadata: &SessionMetadata) {
844 let line = serialize_to_ndjson(&SessionEntry::metadata(metadata))
845 .unwrap_or_default();
846 SessionWriter::enqueue(session_id, line);
847}
848
849pub async fn drain_all_sessions() {
851 loop {
852 if SessionWriter::drain().await {
853 break;
854 }
855 }
856}
857
858#[cfg(test)]
859mod resume_tests {
860 use super::*;
861
862 #[test]
863 fn test_deduplicate_messages() {
864 let messages = vec![
865 Message {
866 role: crate::types::MessageRole::User,
867 content: "hello".to_string(),
868 ..Default::default()
869 },
870 Message {
871 role: crate::types::MessageRole::User,
872 content: "hello".to_string(),
873 ..Default::default()
874 },
875 Message {
876 role: crate::types::MessageRole::Assistant,
877 content: "hi back".to_string(),
878 ..Default::default()
879 },
880 ];
881 let deduped = deduplicate_messages(messages);
882 assert_eq!(deduped.len(), 2);
883 }
884
885 #[test]
886 fn test_deduplicate_preserves_order() {
887 let messages = vec![
888 Message {
889 role: crate::types::MessageRole::User,
890 content: "first".to_string(),
891 ..Default::default()
892 },
893 Message {
894 role: crate::types::MessageRole::Assistant,
895 content: "second".to_string(),
896 ..Default::default()
897 },
898 Message {
899 role: crate::types::MessageRole::User,
900 content: "first".to_string(),
901 ..Default::default()
902 },
903 ];
904 let deduped = deduplicate_messages(messages);
905 assert_eq!(deduped.len(), 2);
906 assert_eq!(deduped[0].content, "first");
907 assert_eq!(deduped[1].content, "second");
908 }
909
910 #[tokio::test]
911 async fn test_resume_session_not_found() {
912 let config = ResumeConfig::default();
913 let result = resume_session("nonexistent-id", &config).await;
914 assert!(result.is_ok());
915 let r = result.unwrap();
916 assert!(r.messages.is_empty());
917 assert!(r.metadata.is_none());
918 }
919
920 #[test]
921 fn test_create_preserved_segment() {
922 let messages: Vec<Message> = (0..10)
923 .map(|i| Message {
924 role: crate::types::MessageRole::User,
925 content: format!("msg {}", i),
926 ..Default::default()
927 })
928 .collect();
929 let segment = create_preserved_segment(&messages, 100, 5);
930 assert!(!segment.is_empty());
931 assert!(segment.len() <= 5);
932 for i in 1..segment.len() {
934 assert!(segment[i].content > segment[i - 1].content);
935 }
936 }
937
938 #[test]
939 fn test_create_preserved_segment_respects_token_budget() {
940 let messages: Vec<Message> = (0..100)
941 .map(|i| Message {
942 role: crate::types::MessageRole::User,
943 content: "x".repeat(10_000),
944 ..Default::default()
945 })
946 .collect();
947 let segment = create_preserved_segment(&messages, 5_000, 10);
948 assert!(segment.len() <= 2);
949 }
950
951 #[test]
952 fn test_is_message_uuid_matches_tool_call_id() {
953 let msg = Message {
954 tool_call_id: Some("abc-123".to_string()),
955 ..Default::default()
956 };
957 assert!(is_message_uuid(&msg, "abc-123"));
958 assert!(!is_message_uuid(&msg, "other-id"));
959 }
960
961 #[test]
962 fn test_md5_hash_deterministic() {
963 let h1 = md5_hash("hello world");
964 let h2 = md5_hash("hello world");
965 assert_eq!(h1, h2);
966 assert_ne!(h1, md5_hash("different"));
967 }
968}
969mod tests {
970 use super::*;
971 use crate::types::MessageRole;
972
973 fn create_test_message(content: &str) -> Message {
974 Message {
975 role: MessageRole::User,
976 content: content.to_string(),
977 ..Default::default()
978 }
979 }
980
981 #[tokio::test]
982 async fn test_get_sessions_dir() {
983 let dir = get_sessions_dir();
984 assert!(dir.to_string_lossy().contains(".open-agent-sdk"));
985 }
986
987 #[tokio::test]
988 async fn test_save_and_load_session() {
989 let _session_id = format!("test-session-{}", uuid::Uuid::new_v4());
990 let session_id = _session_id.as_str();
991 let messages = vec![create_test_message("Hello")];
992
993 save_session(session_id, messages.clone(), None)
995 .await
996 .unwrap();
997
998 let loaded = load_session(session_id).await.unwrap();
1000 assert!(loaded.is_some());
1001 assert_eq!(loaded.unwrap().messages.len(), 1);
1002
1003 delete_session(session_id).await.unwrap();
1005 }
1006
1007 #[tokio::test]
1008 async fn test_load_nonexistent_session() {
1009 let loaded = load_session("nonexistent-session").await.unwrap();
1010 assert!(loaded.is_none());
1011 }
1012
1013 #[tokio::test]
1014 async fn test_fork_session() {
1015 let _source_id = format!("fork-source-{}", uuid::Uuid::new_v4());
1016 let source_id = _source_id.as_str();
1017 let messages = vec![
1018 create_test_message("First"),
1019 Message {
1020 role: MessageRole::Assistant,
1021 content: "Response".to_string(),
1022 ..Default::default()
1023 },
1024 ];
1025
1026 save_session(source_id, messages, None).await.unwrap();
1028
1029 let fork_id = fork_session(source_id, None).await.unwrap();
1031 assert!(fork_id.is_some());
1032
1033 let fork_messages = get_session_messages(fork_id.as_ref().unwrap())
1035 .await
1036 .unwrap();
1037 assert_eq!(fork_messages.len(), 2);
1038
1039 delete_session(source_id).await.unwrap();
1041 delete_session(fork_id.as_ref().unwrap()).await.unwrap();
1042 }
1043
1044 #[tokio::test]
1045 async fn test_append_to_session() {
1046 let _session_id = format!("append-test-{}", uuid::Uuid::new_v4());
1047 let session_id = _session_id.as_str();
1048
1049 save_session(session_id, vec![create_test_message("Initial")], None)
1051 .await
1052 .unwrap();
1053
1054 append_to_session(
1056 session_id,
1057 Message {
1058 role: MessageRole::Assistant,
1059 content: "Response".to_string(),
1060 ..Default::default()
1061 },
1062 )
1063 .await
1064 .unwrap();
1065
1066 let loaded = load_session(session_id).await.unwrap().unwrap();
1068 assert_eq!(loaded.messages.len(), 2);
1069
1070 delete_session(session_id).await.unwrap();
1072 }
1073
1074 #[tokio::test]
1075 async fn test_rename_session() {
1076 let _session_id = format!("rename-test-{}", uuid::Uuid::new_v4());
1077 let session_id = _session_id.as_str();
1078 save_session(session_id, vec![create_test_message("Test")], None)
1079 .await
1080 .unwrap();
1081
1082 rename_session(session_id, "My Session").await.unwrap();
1083
1084 let info = get_session_info(session_id).await.unwrap().unwrap();
1085 assert_eq!(info.summary, Some("My Session".to_string()));
1086
1087 delete_session(session_id).await.unwrap();
1089 }
1090
1091 #[tokio::test]
1092 async fn test_tag_session() {
1093 let _session_id = format!("tag-test-{}", uuid::Uuid::new_v4());
1094 let session_id = _session_id.as_str();
1095 save_session(session_id, vec![create_test_message("Test")], None)
1096 .await
1097 .unwrap();
1098
1099 tag_session(session_id, Some("important")).await.unwrap();
1100
1101 let info = get_session_info(session_id).await.unwrap().unwrap();
1102 assert_eq!(info.tag, Some("important".to_string()));
1103
1104 delete_session(session_id).await.unwrap();
1106 }
1107
1108 #[tokio::test]
1109 async fn test_delete_session() {
1110 let _session_id = format!("delete-test-{}", uuid::Uuid::new_v4());
1111 let session_id = _session_id.as_str();
1112 save_session(session_id, vec![create_test_message("Test")], None)
1113 .await
1114 .unwrap();
1115
1116 let result = delete_session(session_id).await.unwrap();
1117 assert!(result);
1118
1119 let loaded = load_session(session_id).await.unwrap();
1121 assert!(loaded.is_none());
1122 }
1123}
1124
1125#[cfg(test)]
1126mod ndjson_tests {
1127 use super::*;
1128
1129 #[test]
1130 fn test_session_entry_message() {
1131 let msg = Message {
1132 role: crate::types::MessageRole::User,
1133 content: "hello world".to_string(),
1134 ..Default::default()
1135 };
1136 let entry = SessionEntry::message(&msg);
1137 assert_eq!(entry.entry_type, Some("message".to_string()));
1138 assert!(entry.timestamp.is_some());
1139 assert!(entry.data.is_some());
1140 }
1141
1142 #[test]
1143 fn test_session_entry_metadata() {
1144 let meta = SessionMetadata {
1145 id: "test-session".to_string(),
1146 cwd: "/tmp".to_string(),
1147 model: "claude-sonnet-4-6".to_string(),
1148 created_at: chrono::Utc::now().to_rfc3339(),
1149 updated_at: chrono::Utc::now().to_rfc3339(),
1150 message_count: 5,
1151 summary: None,
1152 tag: None,
1153 };
1154 let entry = SessionEntry::metadata(&meta);
1155 assert_eq!(entry.entry_type, Some("metadata".to_string()));
1156 }
1157
1158 #[test]
1159 fn test_session_entry_serializes() {
1160 let msg = Message {
1161 role: crate::types::MessageRole::User,
1162 content: "test message".to_string(),
1163 ..Default::default()
1164 };
1165 let entry = SessionEntry::message(&msg);
1166 let json = serde_json::to_string(&entry).unwrap();
1167 assert!(json.contains("\"type\":\"message\""));
1168 assert!(json.contains("\"t\""));
1169 }
1170
1171 #[test]
1172 fn test_session_entry_serializes_with_unicode() {
1173 let msg = Message {
1174 role: crate::types::MessageRole::User,
1175 content: "test\u{2028}line\u{2029}sep".to_string(),
1176 ..Default::default()
1177 };
1178 let entry = SessionEntry::message(&msg);
1179 let json = serialize_to_ndjson(&entry).unwrap();
1180 assert!(json.contains("\\u2028"));
1182 assert!(json.contains("\\u2029"));
1183 assert!(serde_json::from_str::<serde_json::Value>(&json).is_ok());
1185 }
1186
1187 #[test]
1188 fn test_get_jsonl_path() {
1189 let path = get_jsonl_path("test-session-123");
1190 assert!(path.to_string_lossy().contains("test-session-123"));
1191 assert!(path.extension().map(|e| e == "jsonl").unwrap_or(false));
1192 }
1193
1194 #[tokio::test]
1195 async fn test_append_session_entry() {
1196 crate::tests::common::clear_all_test_state();
1197 let session_id = format!("ndjson-append-test-{}", uuid::Uuid::new_v4());
1198 let msg = Message {
1199 role: crate::types::MessageRole::User,
1200 content: "first message".to_string(),
1201 ..Default::default()
1202 };
1203 let entry = SessionEntry::message(&msg);
1204
1205 append_session_entry(&session_id, &entry).await.unwrap();
1206
1207 let path = get_jsonl_path(&session_id);
1209 assert!(path.exists());
1210
1211 let content = fs::read_to_string(&path).await.unwrap();
1213 let lines: Vec<&str> = content.lines().collect();
1214 assert_eq!(lines.len(), 1);
1215 let parsed: SessionEntry = serde_json::from_str(lines[0]).unwrap();
1216 assert_eq!(parsed.entry_type, Some("message".to_string()));
1217
1218 let msg2 = Message {
1220 role: crate::types::MessageRole::Assistant,
1221 content: "response".to_string(),
1222 ..Default::default()
1223 };
1224 let entry2 = SessionEntry::message(&msg2);
1225 append_session_entry(&session_id, &entry2).await.unwrap();
1226
1227 let content = fs::read_to_string(&path).await.unwrap();
1228 let lines: Vec<&str> = content.lines().collect();
1229 assert_eq!(lines.len(), 2);
1230 let parsed2: SessionEntry = serde_json::from_str(lines[1]).unwrap();
1231 assert_eq!(parsed2.entry_type, Some("message".to_string()));
1232
1233 let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1235 }
1236
1237 #[tokio::test]
1238 async fn test_load_session_jsonl() {
1239 crate::tests::common::clear_all_test_state();
1240 let session_id = format!("ndjson-load-test-{}", uuid::Uuid::new_v4());
1241
1242 let dir = get_session_path(&session_id);
1244 fs::create_dir_all(&dir).await.unwrap();
1245
1246 let msg1 = Message {
1247 role: crate::types::MessageRole::User,
1248 content: "hello".to_string(),
1249 ..Default::default()
1250 };
1251 let msg2 = Message {
1252 role: crate::types::MessageRole::Assistant,
1253 content: "hi there".to_string(),
1254 ..Default::default()
1255 };
1256 append_session_entry(&session_id, &SessionEntry::message(&msg1)).await.unwrap();
1257 append_session_entry(&session_id, &SessionEntry::message(&msg2)).await.unwrap();
1258
1259 let data = load_session_jsonl(&session_id).await.unwrap();
1261 assert!(data.is_some());
1262 let data = data.unwrap();
1263 assert_eq!(data.messages.len(), 2);
1264 assert_eq!(data.messages[0].content, "hello");
1265 assert_eq!(data.messages[1].content, "hi there");
1266
1267 let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1269 }
1270
1271 #[tokio::test]
1272 async fn test_append_session_message() {
1273 crate::tests::common::clear_all_test_state();
1274 let session_id = format!("ndjson-append-msg-{}", uuid::Uuid::new_v4());
1275
1276 let msg = Message {
1277 role: crate::types::MessageRole::User,
1278 content: "quick test".to_string(),
1279 ..Default::default()
1280 };
1281 append_session_message(&session_id, &msg).await.unwrap();
1282
1283 let path = get_jsonl_path(&session_id);
1284 assert!(path.exists());
1285
1286 let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1288 }
1289
1290 #[tokio::test]
1291 async fn test_load_empty_jsonl() {
1292 crate::tests::common::clear_all_test_state();
1293 let session_id = format!("ndjson-empty-test-{}", uuid::Uuid::new_v4());
1294 let result = load_session_jsonl(&session_id).await.unwrap();
1295 assert!(result.is_none());
1296 }
1297
1298 #[tokio::test]
1299 async fn test_enqueue_and_drain() {
1300 crate::tests::common::clear_all_test_state();
1301 let session_id = format!("ndjson-enqueue-test-{}", uuid::Uuid::new_v4());
1302
1303 SessionWriter::enqueue(&session_id, "{\"test\":1}".to_string());
1304 SessionWriter::enqueue(&session_id, "{\"test\":2}".to_string());
1305
1306 SessionWriter::drain().await;
1308
1309 let path = get_jsonl_path(&session_id);
1311 assert!(path.exists());
1312 let content = fs::read_to_string(&path).await.unwrap();
1313 assert!(content.contains("{\"test\":1}"));
1314 assert!(content.contains("{\"test\":2}"));
1315
1316 {
1318 let mut pending = SESSION_PENDING.lock().unwrap();
1319 pending.remove(&session_id);
1320 }
1321 let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1322 }
1323
1324 #[tokio::test]
1325 async fn test_enqueue_session_message() {
1326 crate::tests::common::clear_all_test_state();
1327 let session_id = format!("ndjson-enqueue-msg-{}", uuid::Uuid::new_v4());
1328
1329 let msg = Message {
1330 role: crate::types::MessageRole::User,
1331 content: "streaming test".to_string(),
1332 ..Default::default()
1333 };
1334 enqueue_session_message(&session_id, &msg);
1335
1336 SessionWriter::drain().await;
1338
1339 let path = get_jsonl_path(&session_id);
1340 assert!(path.exists());
1341 let content = fs::read_to_string(&path).await.unwrap();
1342 assert!(content.contains("streaming test"));
1343
1344 {
1346 let mut pending = SESSION_PENDING.lock().unwrap();
1347 pending.remove(&session_id);
1348 }
1349 let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1350 }
1351
1352 #[tokio::test]
1353 async fn test_multiple_sessions_drain() {
1354 crate::tests::common::clear_all_test_state();
1355 let session_id1 = format!("ndjson-multi-1-{}", uuid::Uuid::new_v4());
1356 let session_id2 = format!("ndjson-multi-2-{}", uuid::Uuid::new_v4());
1357
1358 SessionWriter::enqueue(&session_id1, "{\"s\":1}".to_string());
1359 SessionWriter::enqueue(&session_id2, "{\"s\":2}".to_string());
1360 SessionWriter::enqueue(&session_id1, "{\"s\":3}".to_string());
1361
1362 SessionWriter::drain().await;
1363
1364 let content1 = fs::read_to_string(get_jsonl_path(&session_id1)).await.unwrap();
1365 let content2 = fs::read_to_string(get_jsonl_path(&session_id2)).await.unwrap();
1366
1367 let lines1: Vec<&str> = content1.lines().collect();
1368 let lines2: Vec<&str> = content2.lines().collect();
1369 assert_eq!(lines1.len(), 2);
1370 assert_eq!(lines2.len(), 1);
1371
1372 {
1374 let mut pending = SESSION_PENDING.lock().unwrap();
1375 pending.remove(&session_id1);
1376 pending.remove(&session_id2);
1377 }
1378 let _ = fs::remove_dir_all(get_session_path(&session_id1)).await;
1379 let _ = fs::remove_dir_all(get_session_path(&session_id2)).await;
1380 }
1381
1382 #[tokio::test]
1383 async fn test_sidechain_jsonl_path() {
1384 let path = get_sidechain_jsonl_path("test-session", "agent-123");
1385 assert!(path.to_string_lossy().contains("test-session"));
1386 assert!(path.to_string_lossy().contains("sidechains"));
1387 assert!(path.to_string_lossy().contains("agent-123.jsonl"));
1388 }
1389
1390 #[tokio::test]
1391 async fn test_record_sidechain_transcript() {
1392 crate::tests::common::clear_all_test_state();
1393 let session_id = format!("sidechain-test-{}", uuid::Uuid::new_v4());
1394 let agent_id = "test-agent-001";
1395
1396 let msgs = vec![
1397 Message {
1398 role: crate::types::MessageRole::Assistant,
1399 content: "subagent start".to_string(),
1400 ..Default::default()
1401 },
1402 Message {
1403 role: crate::types::MessageRole::User,
1404 content: "tool result".to_string(),
1405 ..Default::default()
1406 },
1407 ];
1408
1409 record_sidechain_transcript(&session_id, &msgs, agent_id, None)
1410 .await
1411 .unwrap();
1412
1413 let path = get_sidechain_jsonl_path(&session_id, agent_id);
1415 assert!(path.exists());
1416
1417 let content = fs::read_to_string(&path).await.unwrap();
1419 let lines: Vec<&str> = content.lines().collect();
1420 assert_eq!(lines.len(), 2); for line in &lines {
1423 let entry: SessionEntry = serde_json::from_str(line).unwrap();
1424 assert_eq!(entry.entry_type.as_deref(), Some("message"));
1425 let data = entry.data.unwrap();
1427 assert!(data.get("isSidechain").unwrap().as_bool().unwrap());
1428 assert_eq!(
1429 data.get("agentId").unwrap().as_str().unwrap(),
1430 agent_id
1431 );
1432 }
1433
1434 let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1436 }
1437
1438 #[tokio::test]
1439 async fn test_sidechain_parent_uuid_chaining() {
1440 crate::tests::common::clear_all_test_state();
1441 let session_id = format!("sidechain-uuid-{}", uuid::Uuid::new_v4());
1442 let agent_id = "uuid-agent";
1443
1444 let starting_uuid = "start-uuid-123".to_string();
1445 let msgs = vec![
1446 Message {
1447 role: crate::types::MessageRole::Assistant,
1448 content: "msg1".to_string(),
1449 ..Default::default()
1450 },
1451 Message {
1452 role: crate::types::MessageRole::Assistant,
1453 content: "msg2".to_string(),
1454 ..Default::default()
1455 },
1456 ];
1457
1458 record_sidechain_transcript(&session_id, &msgs, agent_id, Some(starting_uuid))
1459 .await
1460 .unwrap();
1461
1462 let content =
1463 fs::read_to_string(get_sidechain_jsonl_path(&session_id, agent_id))
1464 .await
1465 .unwrap();
1466 let lines: Vec<&str> = content.lines().collect();
1467
1468 let first: SessionEntry = serde_json::from_str(lines[0]).unwrap();
1470 assert_eq!(
1471 first.data.unwrap().get("parentUuid").unwrap().as_str().unwrap(),
1472 "start-uuid-123"
1473 );
1474
1475 let second: SessionEntry = serde_json::from_str(lines[1]).unwrap();
1477 let second_data = second.data.unwrap();
1478 let second_parent = second_data.get("parentUuid");
1479 assert!(second_parent.is_some());
1480 assert_ne!(
1482 second_parent.unwrap().as_str().unwrap(),
1483 "start-uuid-123"
1484 );
1485
1486 let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1488 }
1489
1490 #[tokio::test]
1491 async fn test_insert_message_chain_sidechain() {
1492 crate::tests::common::clear_all_test_state();
1493 let session_id = format!("insert-chain-{}", uuid::Uuid::new_v4());
1494 let msgs = vec![Message {
1495 role: crate::types::MessageRole::Assistant,
1496 content: "chain msg".to_string(),
1497 ..Default::default()
1498 }];
1499
1500 insert_message_chain(
1501 &session_id,
1502 &msgs,
1503 true,
1504 Some("chain-agent".to_string()),
1505 None,
1506 )
1507 .await
1508 .unwrap();
1509
1510 let path = get_sidechain_jsonl_path(&session_id, "chain-agent");
1511 assert!(path.exists());
1512
1513 let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1515 }
1516
1517 #[tokio::test]
1518 async fn test_insert_message_chain_main() {
1519 crate::tests::common::clear_all_test_state();
1520 let session_id = format!("insert-main-{}", uuid::Uuid::new_v4());
1521 let msgs = vec![Message {
1522 role: crate::types::MessageRole::User,
1523 content: "main msg".to_string(),
1524 ..Default::default()
1525 }];
1526
1527 insert_message_chain(&session_id, &msgs, false, None, None)
1528 .await
1529 .unwrap();
1530
1531 let path = get_jsonl_path(&session_id);
1533 assert!(path.exists());
1534
1535 let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1537 }
1538
1539 #[tokio::test]
1540 async fn test_sidechain_message_entry() {
1541 let msg = Message {
1542 role: crate::types::MessageRole::Assistant,
1543 content: "test".to_string(),
1544 ..Default::default()
1545 };
1546 let entry = SessionEntry::sidechain_message(&msg, "agent-1", Some("parent-uuid"));
1547
1548 assert_eq!(entry.entry_type.as_deref(), Some("message"));
1549 let data = entry.data.unwrap();
1550 assert!(data.get("isSidechain").unwrap().is_boolean());
1551 assert_eq!(data.get("agentId").unwrap().as_str().unwrap(), "agent-1");
1552 assert_eq!(
1553 data.get("parentUuid").unwrap().as_str().unwrap(),
1554 "parent-uuid"
1555 );
1556 }
1557}