1use anyhow::{Result, bail};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned};
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::fs;
13use tokio::io::AsyncWriteExt;
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
17pub struct SessionId(pub String);
18
19impl SessionId {
20 pub fn new() -> Self {
22 Self(uuid::Uuid::new_v4().to_string())
23 }
24}
25
26impl Default for SessionId {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl std::fmt::Display for SessionId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 write!(f, "{}", self.0)
35 }
36}
37
38impl Serialize for SessionId {
39 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
40 where
41 S: Serializer,
42 {
43 serializer.serialize_str(&self.0)
44 }
45}
46
47impl<'de> Deserialize<'de> for SessionId {
48 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
49 where
50 D: Deserializer<'de>,
51 {
52 let s = String::deserialize(deserializer)?;
53 Ok(Self(s))
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct UserMessage {
60 pub content: String,
62 pub timestamp: DateTime<Utc>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct AgentResponse {
69 pub content: String,
71 pub session_id: Option<String>,
73 pub seed_id: Option<String>,
75 pub phase_reached: Option<String>,
77 pub evaluation_passed: Option<bool>,
79 pub timestamp: DateTime<Utc>,
81 #[serde(skip_serializing_if = "Option::is_none")]
85 pub trajectory_range: Option<TrajectoryRange>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct TrajectoryRange {
91 pub start: usize,
93 pub end: usize,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct TrajectoryStepRecord {
105 pub tool_name: String,
107 pub tool_args: serde_json::Value,
109 pub output_summary: String,
111 pub duration_ms: u64,
113 pub is_error: bool,
115 pub tool_call_id: String,
117 pub timestamp: DateTime<Utc>,
119}
120
121pub type SessionMetadata = std::collections::HashMap<String, serde_json::Value>;
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct Session {
131 pub id: SessionId,
133 pub user_id: String,
135 #[serde(default)]
137 pub user_messages: Vec<UserMessage>,
138 #[serde(default)]
140 pub agent_responses: Vec<AgentResponse>,
141 #[serde(default, skip_serializing_if = "Vec::is_empty")]
145 pub trajectory_steps: Vec<TrajectoryStepRecord>,
146 #[serde(skip_serializing_if = "Option::is_none")]
148 pub active_seed_id: Option<String>,
149 #[serde(skip_serializing_if = "Option::is_none")]
151 pub active_persona_id: Option<String>,
152 #[serde(default, skip_serializing_if = "Option::is_none")]
155 pub project_id: Option<String>,
156 pub created_at: DateTime<Utc>,
158 pub updated_at: DateTime<Utc>,
160 #[serde(default)]
162 pub metadata: SessionMetadata,
163}
164
165impl Session {
166 pub fn new(user_id: impl Into<String>) -> Self {
168 let now = Utc::now();
169 Self {
170 id: SessionId::new(),
171 user_id: user_id.into(),
172 user_messages: Vec::new(),
173 agent_responses: Vec::new(),
174 trajectory_steps: Vec::new(),
175 active_seed_id: None,
176 active_persona_id: None,
177 project_id: None,
178 created_at: now,
179 updated_at: now,
180 metadata: SessionMetadata::new(),
181 }
182 }
183
184 pub fn with_id(user_id: impl Into<String>, session_id: SessionId) -> Self {
186 let now = Utc::now();
187 Self {
188 id: session_id,
189 user_id: user_id.into(),
190 user_messages: Vec::new(),
191 agent_responses: Vec::new(),
192 trajectory_steps: Vec::new(),
193 active_seed_id: None,
194 active_persona_id: None,
195 project_id: None,
196 created_at: now,
197 updated_at: now,
198 metadata: SessionMetadata::new(),
199 }
200 }
201
202 pub fn add_user_message(&mut self, content: impl Into<String>) {
204 self.user_messages.push(UserMessage {
205 content: content.into(),
206 timestamp: Utc::now(),
207 });
208 self.updated_at = Utc::now();
209 }
210
211 pub fn add_agent_response(&mut self, response: AgentResponse) {
213 self.agent_responses.push(response);
214 self.updated_at = Utc::now();
215 }
216
217 pub fn extend_trajectory(&mut self, steps: Vec<TrajectoryStepRecord>) {
222 if steps.is_empty() {
223 return;
224 }
225 self.trajectory_steps.extend(steps);
226 self.updated_at = Utc::now();
227 }
228
229 pub fn trajectory(&self) -> &[TrajectoryStepRecord] {
231 &self.trajectory_steps
232 }
233
234 pub fn set_active_seed(&mut self, seed_id: Option<String>) {
236 self.active_seed_id = seed_id;
237 self.updated_at = Utc::now();
238 }
239
240 pub fn set_active_persona(&mut self, persona_id: Option<String>) {
242 self.active_persona_id = persona_id;
243 self.updated_at = Utc::now();
244 }
245
246 pub fn set_metadata(&mut self, key: impl Into<String>, value: serde_json::Value) {
248 self.metadata.insert(key.into(), value);
249 self.updated_at = Utc::now();
250 }
251
252 pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
254 self.metadata.get(key)
255 }
256
257 pub fn exchange_count(&self) -> usize {
259 self.user_messages.len().min(self.agent_responses.len())
260 }
261
262 pub fn is_empty(&self) -> bool {
264 self.user_messages.is_empty()
265 }
266}
267#[derive(Clone)]
272pub struct StateStore {
273 pub base_path: PathBuf,
275 session_locks: Arc<parking_lot::RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>>,
281}
282
283impl StateStore {
284 pub fn new(base_path: PathBuf) -> Result<Self> {
295 Ok(Self {
296 base_path,
297 session_locks: Arc::new(parking_lot::RwLock::new(HashMap::new())),
298 })
299 }
300
301 fn validate_category(category: &str) -> Result<()> {
303 if category.contains("..") || category.contains('\\') {
304 bail!("invalid category name: '{category}'");
305 }
306 if category.is_empty()
307 || category.starts_with('/')
308 || category.ends_with('/')
309 || category.contains("//")
310 {
311 bail!("invalid category name: '{category}'");
312 }
313 Ok(())
314 }
315
316 fn validate_name(name: &str) -> Result<()> {
318 if name.contains("..") || name.contains('/') || name.contains('\\') {
319 bail!("invalid file name: '{name}'");
320 }
321 Ok(())
322 }
323
324 async fn durable_write(
334 dir: &std::path::Path,
335 target: &std::path::Path,
336 content: &[u8],
337 ) -> Result<()> {
338 let file_name = target
339 .file_name()
340 .and_then(|n| n.to_str())
341 .unwrap_or("state");
342 let temp_path = dir.join(format!(
343 "{file_name}.{}.{}.tmp",
344 std::process::id(),
345 uuid::Uuid::new_v4()
346 ));
347 {
348 let mut file = fs::File::create(&temp_path).await?;
349 file.write_all(content).await?;
350 file.sync_all().await?;
351 }
352 tokio::fs::rename(&temp_path, target).await?;
353 if let Ok(dir_file) = fs::File::open(dir).await {
356 let _ = dir_file.sync_all().await;
357 }
358 Ok(())
359 }
360
361 pub async fn save_markdown(&self, category: &str, name: &str, content: &str) -> Result<()> {
362 Self::validate_category(category)?;
363 Self::validate_name(name)?;
364 let dir = self.base_path.join(category);
365 fs::create_dir_all(&dir).await?;
366 let path = dir.join(format!("{name}.md"));
367 Self::durable_write(&dir, &path, content.as_bytes()).await?;
368 Ok(())
369 }
370
371 pub async fn load_markdown(&self, category: &str, name: &str) -> Result<Option<String>> {
373 Self::validate_category(category)?;
374 Self::validate_name(name)?;
375 let path = self.base_path.join(category).join(format!("{name}.md"));
376 match fs::read_to_string(&path).await {
377 Ok(content) => Ok(Some(content)),
378 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
379 Err(e) => Err(e.into()),
380 }
381 }
382
383 pub async fn list_category(&self, category: &str) -> Result<Vec<String>> {
385 Self::validate_category(category)?;
386 let dir = self.base_path.join(category);
387 if !dir.exists() {
388 return Ok(Vec::new());
389 }
390 let mut entries = fs::read_dir(&dir).await?;
391 let mut names = Vec::new();
392 while let Some(entry) = entries.next_entry().await? {
393 let path = entry.path();
394 if let Some(ext) = path.extension()
395 && (ext == "md" || ext == "json")
396 && let Some(stem) = path.file_stem()
397 {
398 names.push(stem.to_string_lossy().into_owned());
399 }
400 }
401 names.sort();
402 Ok(names)
403 }
404
405 pub async fn save_json<T: Serialize>(
407 &self,
408 category: &str,
409 name: &str,
410 data: &T,
411 ) -> Result<()> {
412 Self::validate_category(category)?;
413 Self::validate_name(name)?;
414 let dir = self.base_path.join(category);
415 fs::create_dir_all(&dir).await?;
416 let path = dir.join(format!("{name}.json"));
417 let content = serde_json::to_string_pretty(data)?;
418 Self::durable_write(&dir, &path, content.as_bytes()).await?;
419 Ok(())
420 }
421
422 pub async fn load_json<T: DeserializeOwned>(
424 &self,
425 category: &str,
426 name: &str,
427 ) -> Result<Option<T>> {
428 Self::validate_category(category)?;
429 Self::validate_name(name)?;
430 let path = self.base_path.join(category).join(format!("{name}.json"));
431 match fs::read_to_string(&path).await {
432 Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
433 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
434 Err(e) => Err(e.into()),
435 }
436 }
437
438 pub async fn delete_file(&self, category: &str, name: &str) -> Result<bool> {
440 Self::validate_category(category)?;
441 Self::validate_name(name)?;
442 let path = self.base_path.join(category).join(format!("{name}.json"));
443 if path.exists() {
444 tokio::fs::remove_file(path).await?;
445 Ok(true)
446 } else {
447 let path = self.base_path.join(category).join(format!("{name}.md"));
448 if path.exists() {
449 tokio::fs::remove_file(path).await?;
450 Ok(true)
451 } else {
452 Ok(false)
453 }
454 }
455 }
456}
457
458impl std::fmt::Debug for StateStore {
459 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
460 f.debug_struct("StateStore")
461 .field("base_path", &self.base_path)
462 .finish()
463 }
464}
465
466impl StateStore {
467 pub async fn save_session(&self, session: &Session) -> Result<()> {
469 self.save_json("sessions", &session.id.0, session).await
470 }
471
472 pub async fn update_session_with<F>(
483 &self,
484 session_id: &SessionId,
485 f: F,
486 ) -> Result<Option<Session>>
487 where
488 F: FnOnce(&mut Session) -> Result<()>,
489 {
490 let lock = Self::session_lock(&self.session_locks, &session_id.0);
491 let _guard = lock.lock().await;
492 let mut session = match self.load_session(session_id).await? {
493 Some(s) => s,
494 None => return Ok(None),
495 };
496 f(&mut session)?;
497 self.save_session(&session).await?;
498 Ok(Some(session))
499 }
500
501 fn session_lock(
505 map: &parking_lot::RwLock<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
506 session_id: &str,
507 ) -> Arc<tokio::sync::Mutex<()>> {
508 if let Some(lock) = map.read().get(session_id) {
510 return Arc::clone(lock);
511 }
512 Arc::clone(
514 map.write()
515 .entry(session_id.to_string())
516 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
517 )
518 }
519
520 pub async fn save_session_with_prune(
522 &self,
523 session: &Session,
524 prune_config: &PruneConfig,
525 ) -> Result<()> {
526 self.save_session(session).await?;
527 let store = self.clone();
529 let config = prune_config.clone();
530 tokio::spawn(async move {
531 if let Err(e) = store.prune_sessions(&config).await {
532 tracing::warn!(error = %e, "Background session pruning failed");
533 }
534 });
535 Ok(())
536 }
537
538 pub async fn load_session(&self, session_id: &SessionId) -> Result<Option<Session>> {
540 self.load_json("sessions", &session_id.0).await
541 }
542
543 pub async fn load_all_sessions(&self) -> Result<Vec<Session>> {
553 let mut sessions = Vec::new();
554 if let Ok(names) = self.list_category("sessions").await {
555 for name in names {
556 if let Ok(Some(session)) = self.load_json::<Session>("sessions", &name).await {
557 sessions.push(session);
558 }
559 }
560 }
561 Ok(sessions)
562 }
563
564 pub async fn load_sessions_for_promotion(&self, since: DateTime<Utc>) -> Result<Vec<Session>> {
575 let mut sessions = Vec::new();
576 if let Ok(names) = self.list_category("sessions").await {
577 for name in names {
578 if let Ok(Some(session)) = self.load_json::<Session>("sessions", &name).await {
579 if session.updated_at < since {
582 continue;
583 }
584 sessions.push(session);
585 }
586 }
587 }
588 Ok(sessions)
589 }
590
591 pub async fn list_sessions(&self) -> Result<Vec<SessionSummary>> {
593 let mut sessions = Vec::new();
594
595 if let Ok(names) = self.list_category("sessions").await {
596 for name in names {
597 if let Ok(Some(session)) = self.load_json::<Session>("sessions", &name).await {
598 sessions.push(SessionSummary {
599 id: session.id.0.clone(),
600 user_id: session.user_id.clone(),
601 message_count: session.user_messages.len(),
602 title: session
603 .metadata
604 .get("title")
605 .and_then(|v| v.as_str())
606 .map(String::from)
607 .or_else(|| {
608 session.user_messages.first().map(|m| {
610 let s = m.content.lines().next().unwrap_or("");
611 if s.len() > 60 {
612 format!("{}…", &s[..s.ceil_char_boundary(59)])
613 } else {
614 s.to_string()
615 }
616 })
617 }),
618 active_seed_id: session.active_seed_id.clone(),
619 project_id: session
620 .project_id
621 .clone()
622 .or_else(|| {
624 session
625 .metadata
626 .get("project_id")
627 .and_then(|v| v.as_str())
628 .map(String::from)
629 })
630 .or_else(|| {
631 session
632 .metadata
633 .get("project_ids")
634 .and_then(|v| v.as_str())
635 .and_then(|s| s.split(',').next().map(String::from))
636 }),
637 created_at: session.created_at,
638 updated_at: session.updated_at,
639 });
640 }
641 }
642 }
643
644 sessions.sort_by_key(|b| std::cmp::Reverse(b.updated_at));
646 Ok(sessions)
647 }
648
649 pub async fn delete_session(&self, session_id: &SessionId) -> Result<bool> {
651 let path = self
652 .base_path
653 .join("sessions")
654 .join(format!("{}.json", session_id.0));
655 match fs::remove_file(&path).await {
656 Ok(()) => {
657 tracing::info!(session_id = %session_id, "Session deleted");
658 Ok(true)
659 }
660 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
661 Err(e) => Err(e.into()),
662 }
663 }
664
665 pub async fn get_or_create_session(
667 &self,
668 user_id: &str,
669 session_id: Option<&SessionId>,
670 ) -> Result<Session> {
671 if let Some(sid) = session_id
672 && let Some(existing) = self.load_session(sid).await?
673 {
674 return Ok(existing);
675 }
676
677 let session = match session_id {
679 Some(sid) => Session::with_id(user_id, sid.clone()),
680 None => Session::new(user_id),
681 };
682
683 self.save_session(&session).await?;
684 Ok(session)
685 }
686
687 pub async fn update_session(&self, session: &Session) -> Result<()> {
689 self.save_session(session).await
690 }
691
692 pub async fn move_session_to_project(
696 &self,
697 session_id: &SessionId,
698 project_id: Option<&str>,
699 ) -> Result<bool> {
700 let project_id_owned = project_id.map(String::from);
703 let updated = self
704 .update_session_with(session_id, |session| {
705 session.project_id = project_id_owned;
706 session.updated_at = Utc::now();
707 Ok(())
708 })
709 .await?;
710 Ok(updated.is_some())
711 }
712
713 pub async fn prune_sessions(&self, config: &PruneConfig) -> Result<usize> {
718 let mut sessions = self.list_sessions().await?;
719 let mut pruned = 0;
720
721 if config.ttl_hours > 0 {
723 let cutoff = Utc::now() - chrono::Duration::hours(config.ttl_hours as i64);
724 let to_prune_ttl: std::collections::HashSet<String> = sessions
725 .iter()
726 .filter(|s| s.updated_at < cutoff)
727 .map(|s| s.id.clone())
728 .collect();
729
730 for id in &to_prune_ttl {
731 let sid = SessionId(id.clone());
732 if self.delete_session(&sid).await.is_ok() {
733 pruned += 1;
734 }
735 }
736
737 sessions.retain(|s| !to_prune_ttl.contains(&s.id));
739 }
740
741 if config.max_sessions > 0 && sessions.len() > config.max_sessions {
743 let excess = sessions.len() - config.max_sessions;
745 for session in sessions.into_iter().rev().take(excess) {
746 let sid = SessionId(session.id);
747 if self.delete_session(&sid).await.is_ok() {
748 pruned += 1;
749 }
750 }
751 }
752
753 if pruned > 0 {
754 tracing::info!(pruned = pruned, "Session pruning completed");
755 }
756
757 Ok(pruned)
758 }
759
760 pub async fn prune_agents_by_config(
765 &self,
766 max_entries: usize,
767 ttl_hours: u64,
768 batch_size: usize,
769 ) -> Result<usize> {
770 let mut pruned = 0usize;
771
772 let names = self.list_category("agents").await?;
773 if names.is_empty() {
774 return Ok(0);
775 }
776
777 let now = Utc::now();
778
779 let mut remaining: Vec<(String, DateTime<Utc>)> = Vec::with_capacity(names.len());
781
782 if ttl_hours > 0 {
783 let cutoff = now - chrono::Duration::hours(ttl_hours as i64);
784 for name in &names {
785 if let Ok(Some(info)) = self
787 .load_json::<crate::types::AgentInfo>("agents", name)
788 .await
789 {
790 if info.created_at < cutoff {
791 if self.delete_file("agents", name).await.unwrap_or(false) {
792 pruned += 1;
793 }
794 } else {
795 remaining.push((name.clone(), info.created_at));
796 }
797 }
798 }
799 } else {
800 for name in &names {
802 if let Ok(Some(info)) = self
803 .load_json::<crate::types::AgentInfo>("agents", name)
804 .await
805 {
806 remaining.push((name.clone(), info.created_at));
807 }
808 }
809 }
810
811 if max_entries > 0 && remaining.len() > max_entries {
813 remaining.sort_by_key(|a| a.1);
815
816 let excess = remaining.len() - max_entries;
817 let to_delete = excess.min(batch_size);
818
819 for (name, _) in remaining.iter().take(to_delete) {
820 if self.delete_file("agents", name).await.unwrap_or(false) {
821 pruned += 1;
822 }
823 }
824 }
825
826 if pruned > 0 {
827 tracing::info!(pruned = pruned, "Agent filesystem pruning completed");
828 }
829
830 Ok(pruned)
831 }
832}
833
834#[derive(Debug, Clone, Serialize, Deserialize)]
836pub struct SessionSummary {
837 pub id: String,
839 pub user_id: String,
841 pub message_count: usize,
843 #[serde(skip_serializing_if = "Option::is_none")]
846 pub title: Option<String>,
847 #[serde(skip_serializing_if = "Option::is_none")]
849 pub active_seed_id: Option<String>,
850 #[serde(skip_serializing_if = "Option::is_none")]
852 pub project_id: Option<String>,
853 pub created_at: DateTime<Utc>,
855 pub updated_at: DateTime<Utc>,
857}
858
859#[derive(Debug, Clone)]
861pub struct PruneConfig {
862 pub max_sessions: usize,
864 pub ttl_hours: u64,
866}
867
868impl Default for PruneConfig {
869 fn default() -> Self {
870 Self {
871 max_sessions: 100,
872 ttl_hours: 168, }
874 }
875}
876
877pub struct PruneThrottle {
879 last_prune: std::sync::Mutex<Option<std::time::Instant>>,
881 cooldown_secs: u64,
883}
884
885impl PruneThrottle {
886 pub fn new(cooldown_secs: u64) -> Self {
888 Self {
889 last_prune: std::sync::Mutex::new(None),
890 cooldown_secs,
891 }
892 }
893
894 pub fn should_prune(&self) -> bool {
897 let mut guard = self.last_prune.lock().unwrap_or_else(|e| {
900 tracing::warn!("PruneThrottle mutex poisoned, recovering: {e}");
901 e.into_inner()
902 });
903 let now = std::time::Instant::now();
904 match *guard {
905 Some(last) => {
906 if now.duration_since(last).as_secs() >= self.cooldown_secs {
907 *guard = Some(now);
908 true
909 } else {
910 false
911 }
912 }
913 None => {
914 *guard = Some(now);
915 true
916 }
917 }
918 }
919}
920
921#[cfg(test)]
922mod tests {
923 use super::*;
924 #[tokio::test]
925 async fn test_session_creation_and_persistence() {
926 let temp_dir = tempfile::tempdir().unwrap();
927 let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
928
929 let mut session = Session::new("user-123");
931 session.add_user_message("Hello");
932
933 store.save_session(&session).await.unwrap();
935 let loaded = store.load_session(&session.id).await.unwrap();
936 assert!(loaded.is_some());
937 let loaded = loaded.unwrap();
938 assert_eq!(loaded.user_id, "user-123");
939 assert_eq!(loaded.user_messages.len(), 1);
940 }
941
942 #[tokio::test]
943 async fn test_session_list_sorts_by_updated() {
944 let temp_dir = tempfile::tempdir().unwrap();
945 let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
946
947 for i in 0..3 {
949 let mut session = Session::new(format!("user-{}", i));
950 session.add_user_message(format!("Message {}", i));
951 store.save_session(&session).await.unwrap();
952 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
953 }
954
955 let sessions = store.list_sessions().await.unwrap();
956 assert_eq!(sessions.len(), 3);
957 assert_eq!(sessions[0].user_id, "user-2");
959 }
960
961 #[tokio::test]
962 async fn test_delete_session() {
963 let temp_dir = tempfile::tempdir().unwrap();
964 let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
965
966 let session = Session::new("user-123");
967 store.save_session(&session).await.unwrap();
968
969 let deleted = store.delete_session(&session.id).await.unwrap();
971 assert!(deleted);
972
973 let loaded = store.load_session(&session.id).await.unwrap();
974 assert!(loaded.is_none());
975 }
976
977 #[tokio::test]
978 async fn test_get_or_create_session_existing() {
979 let temp_dir = tempfile::tempdir().unwrap();
980 let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
981
982 let mut existing = Session::new("user-123");
983 existing.add_user_message("Original message");
984 store.save_session(&existing).await.unwrap();
985
986 let retrieved = store
988 .get_or_create_session("user-123", Some(&existing.id))
989 .await
990 .unwrap();
991 assert_eq!(retrieved.id, existing.id);
992 assert_eq!(retrieved.user_messages.len(), 1);
993 }
994
995 #[tokio::test]
996 async fn test_get_or_create_session_new() {
997 let temp_dir = tempfile::tempdir().unwrap();
998 let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
999
1000 let session = store.get_or_create_session("user-456", None).await.unwrap();
1002 assert_eq!(session.user_id, "user-456");
1003 assert!(session.user_messages.is_empty());
1004 }
1005
1006 #[tokio::test]
1007 async fn test_prune_sessions_by_count() {
1008 let temp_dir = tempfile::tempdir().unwrap();
1009 let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
1010
1011 for i in 0..5 {
1013 let mut session = Session::new(format!("user-{}", i));
1014 session.add_user_message(format!("Message {}", i));
1015 store.save_session(&session).await.unwrap();
1016 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1017 }
1018
1019 let config = PruneConfig {
1021 max_sessions: 3,
1022 ttl_hours: 0,
1023 };
1024 let pruned = store.prune_sessions(&config).await.unwrap();
1025 assert_eq!(pruned, 2);
1026
1027 let remaining = store.list_sessions().await.unwrap();
1028 assert_eq!(remaining.len(), 3);
1029 let remaining_ids: Vec<&str> = remaining.iter().map(|s| s.user_id.as_str()).collect();
1031 assert!(remaining_ids.contains(&"user-2"));
1032 assert!(remaining_ids.contains(&"user-3"));
1033 assert!(remaining_ids.contains(&"user-4"));
1034 }
1035
1036 #[tokio::test]
1037 async fn test_prune_sessions_by_ttl() {
1038 let temp_dir = tempfile::tempdir().unwrap();
1039 let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
1040
1041 let mut old_session = Session::new("old-user");
1043 old_session.updated_at = Utc::now() - chrono::Duration::hours(48);
1044 store.save_session(&old_session).await.unwrap();
1045
1046 let mut recent_session = Session::new("recent-user");
1048 recent_session.add_user_message("Hello");
1049 store.save_session(&recent_session).await.unwrap();
1050
1051 let config = PruneConfig {
1053 max_sessions: 0,
1054 ttl_hours: 24,
1055 };
1056 let pruned = store.prune_sessions(&config).await.unwrap();
1057 assert_eq!(pruned, 1);
1058
1059 let remaining = store.list_sessions().await.unwrap();
1060 assert_eq!(remaining.len(), 1);
1061 assert_eq!(remaining[0].user_id, "recent-user");
1062 }
1063
1064 #[tokio::test]
1065 async fn test_load_sessions_for_promotion_filters_by_cutoff() {
1066 let temp_dir = tempfile::tempdir().unwrap();
1069 let store = StateStore::new(temp_dir.path().to_path_buf()).unwrap();
1070
1071 let mut old_session = Session::new("old-user");
1073 old_session.updated_at = Utc::now() - chrono::Duration::hours(48);
1074 store.save_session(&old_session).await.unwrap();
1075
1076 let recent_session = Session::new("recent-user");
1078 store.save_session(&recent_session).await.unwrap();
1079
1080 let cutoff = Utc::now() - chrono::Duration::hours(24);
1082 let sessions = store.load_sessions_for_promotion(cutoff).await.unwrap();
1083 assert_eq!(sessions.len(), 1, "old session must be filtered out");
1084 assert_eq!(sessions[0].user_id, "recent-user");
1085
1086 let far_cutoff = Utc::now() - chrono::Duration::days(365);
1088 let all = store.load_sessions_for_promotion(far_cutoff).await.unwrap();
1089 assert_eq!(all.len(), 2);
1090 }
1091}