Skip to main content

matrixcode_core/session/
session.rs

1//! Session data and file locking
2
3use chrono::Utc;
4use serde::{Deserialize, Serialize};
5use std::path::{Path, PathBuf};
6
7use super::metadata::{SessionMetadata, MessageSummary};
8use crate::providers::Message;
9
10/// Full session data including messages.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Session {
13    pub metadata: SessionMetadata,
14    /// Full message history for display (TUI shows this).
15    #[serde(default)]
16    pub full_messages: Vec<Message>,
17    /// Compressed messages for API requests (Agent uses this).
18    #[serde(default)]
19    pub compressed_messages: Vec<Message>,
20    /// Summaries of compressed messages (for TUI history view).
21    #[serde(default)]
22    pub message_summaries: Vec<MessageSummary>,
23    /// Legacy field - migrated to full_messages on load.
24    #[serde(default, skip_serializing)]
25    pub messages: Vec<Message>,
26}
27
28impl Session {
29    /// Create a new empty session.
30    pub fn new(project_path: Option<&Path>) -> Self {
31        Self {
32            metadata: SessionMetadata::new(project_path),
33            full_messages: Vec::new(),
34            compressed_messages: Vec::new(),
35            message_summaries: Vec::new(),
36            messages: Vec::new(),
37        }
38    }
39
40    /// Create a session from existing messages.
41    pub fn from_messages(messages: Vec<Message>, project_path: Option<&Path>) -> Self {
42        let mut meta = SessionMetadata::new(project_path);
43        meta.message_count = messages.len();
44        Self {
45            metadata: meta,
46            full_messages: messages.clone(),
47            compressed_messages: Vec::new(),
48            message_summaries: messages
49                .iter()
50                .enumerate()
51                .map(|(i, m)| MessageSummary::from_message(m, i))
52                .collect(),
53            messages,
54        }
55    }
56
57    /// Get messages for API requests (use compressed if available).
58    pub fn api_messages(&self) -> &[Message] {
59        if self.compressed_messages.is_empty() {
60            &self.full_messages
61        } else {
62            &self.compressed_messages
63        }
64    }
65
66    /// Get messages for display (always full messages).
67    pub fn display_messages(&self) -> &[Message] {
68        &self.full_messages
69    }
70
71    /// Update metadata after a turn.
72    pub fn update_stats(&mut self, last_input_tokens: u32, total_output_tokens: u64) {
73        self.metadata.message_count = self.full_messages.len();
74        self.metadata.last_input_tokens = last_input_tokens as u64;
75        self.metadata.total_output_tokens = total_output_tokens;
76        self.metadata.updated_at = Utc::now();
77    }
78
79    /// Set compressed messages (called after compression).
80    pub fn set_compressed(&mut self, compressed: Vec<Message>, summaries: Vec<MessageSummary>) {
81        self.compressed_messages = compressed;
82        self.message_summaries = summaries;
83    }
84
85    /// Get the session name (user-defined or fallback).
86    pub fn name(&self) -> Option<&str> {
87        self.metadata.name.as_deref()
88    }
89
90    /// Migrate legacy messages field to full_messages.
91    pub fn migrate_legacy(&mut self) {
92        if !self.messages.is_empty() && self.full_messages.is_empty() {
93            log::info!(
94                "Migrating legacy session: {} messages -> full_messages",
95                self.messages.len()
96            );
97            self.full_messages = self.messages.clone();
98            self.message_summaries = self
99                .messages
100                .iter()
101                .enumerate()
102                .map(|(i, m)| MessageSummary::from_message(m, i))
103                .collect();
104            self.messages.clear();
105            log::info!(
106                "Migration complete: full_messages={}, summaries={}",
107                self.full_messages.len(),
108                self.message_summaries.len()
109            );
110        }
111    }
112}
113
114/// File lock for preventing concurrent access to session storage.
115pub struct SessionFileLock {
116    lock_path: PathBuf,
117    locked: bool,
118}
119
120impl SessionFileLock {
121    pub fn new(base_dir: &Path) -> Self {
122        Self {
123            lock_path: base_dir.join("sessions.lock"),
124            locked: false,
125        }
126    }
127
128    /// Acquire the lock (blocking with timeout).
129    pub fn acquire(&mut self, timeout_ms: u64) -> anyhow::Result<()> {
130        if self.locked {
131            return Ok(());
132        }
133
134        let start = std::time::Instant::now();
135
136        while start.elapsed().as_millis() < timeout_ms as u128 {
137            match std::fs::File::create_new(&self.lock_path) {
138                Ok(_) => {
139                    let lock_info = format!("{}:{}", std::process::id(), Utc::now().to_rfc3339());
140                    std::fs::write(&self.lock_path, lock_info)?;
141                    self.locked = true;
142                    return Ok(());
143                }
144                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
145                    if self.is_stale_lock()? {
146                        self.remove_stale_lock()?;
147                    }
148                    std::thread::sleep(std::time::Duration::from_millis(50));
149                }
150                Err(e) => {
151                    return Err(e.into());
152                }
153            }
154        }
155
156        anyhow::bail!("Failed to acquire session lock after {}ms timeout", timeout_ms)
157    }
158
159    fn is_stale_lock(&self) -> anyhow::Result<bool> {
160        if !self.lock_path.exists() {
161            return Ok(false);
162        }
163
164        if let Ok(content) = std::fs::read_to_string(&self.lock_path)
165            && let Some(pid_str) = content.split(':').next()
166            && let Ok(pid) = pid_str.parse::<u32>()
167            && !self.is_process_running(pid)
168        {
169            return Ok(true);
170        }
171
172        let metadata = std::fs::metadata(&self.lock_path)?;
173        let modified = metadata.modified()?;
174        let age = std::time::SystemTime::now()
175            .duration_since(modified)
176            .unwrap_or(std::time::Duration::ZERO);
177
178        Ok(age > std::time::Duration::from_secs(60))
179    }
180
181    fn is_process_running(&self, pid: u32) -> bool {
182        #[cfg(unix)]
183        {
184            std::path::Path::new(&format!("/proc/{}", pid)).exists()
185        }
186        #[cfg(windows)]
187        {
188            use std::process::Command;
189            let output = Command::new("tasklist")
190                .args(["/FI", &format!("PID eq {}", pid), "/NH"])
191                .output();
192
193            match output {
194                Ok(out) => {
195                    let stdout = String::from_utf8_lossy(&out.stdout);
196                    stdout.contains(&pid.to_string()) && !stdout.contains("No tasks")
197                }
198                Err(_) => true,
199            }
200        }
201    }
202
203    fn remove_stale_lock(&self) -> anyhow::Result<()> {
204        if self.lock_path.exists() {
205            std::fs::remove_file(&self.lock_path)?;
206        }
207        Ok(())
208    }
209
210    pub fn release(&mut self) -> anyhow::Result<()> {
211        if self.locked {
212            std::fs::remove_file(&self.lock_path)?;
213            self.locked = false;
214        }
215        Ok(())
216    }
217}
218
219impl Drop for SessionFileLock {
220    fn drop(&mut self) {
221        let _ = self.release();
222    }
223}