Skip to main content

matrixcode_core/session/
session.rs

1//! Session data and file locking
2
3use chrono::Utc;
4use serde::{Deserialize, Serialize};
5use std::path::{Path, PathBuf};
6
7use super::metadata::{MessageSummary, SessionMetadata};
8use crate::providers::Message;
9
10/// Full session data including messages.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Session {
13    pub metadata: SessionMetadata,
14    /// Full message history for display (TUI shows this).
15    #[serde(default)]
16    pub full_messages: Vec<Message>,
17    /// Compressed messages for API requests (Agent uses this).
18    #[serde(default)]
19    pub compressed_messages: Vec<Message>,
20    /// Summaries of compressed messages (for TUI history view).
21    #[serde(default)]
22    pub message_summaries: Vec<MessageSummary>,
23    /// Legacy field - migrated to full_messages on load.
24    #[serde(default, skip_serializing)]
25    pub messages: Vec<Message>,
26}
27
28impl Session {
29    /// Create a new empty session.
30    pub fn new(project_path: Option<&Path>) -> Self {
31        Self {
32            metadata: SessionMetadata::new(project_path),
33            full_messages: Vec::new(),
34            compressed_messages: Vec::new(),
35            message_summaries: Vec::new(),
36            messages: Vec::new(),
37        }
38    }
39
40    /// Create a session from existing messages.
41    pub fn from_messages(messages: Vec<Message>, project_path: Option<&Path>) -> Self {
42        let mut meta = SessionMetadata::new(project_path);
43        meta.message_count = messages.len();
44        Self {
45            metadata: meta,
46            full_messages: messages.clone(),
47            compressed_messages: Vec::new(),
48            message_summaries: messages
49                .iter()
50                .enumerate()
51                .map(|(i, m)| MessageSummary::from_message(m, i))
52                .collect(),
53            messages,
54        }
55    }
56
57    /// Get messages for API requests (use compressed if available).
58    pub fn api_messages(&self) -> &[Message] {
59        if self.compressed_messages.is_empty() {
60            &self.full_messages
61        } else {
62            &self.compressed_messages
63        }
64    }
65
66    /// Get messages for display (always full messages).
67    pub fn display_messages(&self) -> &[Message] {
68        &self.full_messages
69    }
70
71    /// Update metadata after a turn.
72    pub fn update_stats(&mut self, last_input_tokens: u32, total_output_tokens: u64) {
73        self.metadata.message_count = self.full_messages.len();
74        self.metadata.last_input_tokens = last_input_tokens as u64;
75        self.metadata.total_output_tokens = total_output_tokens;
76        self.metadata.updated_at = Utc::now();
77    }
78
79    /// Set compressed messages (called after compression).
80    pub fn set_compressed(&mut self, compressed: Vec<Message>, summaries: Vec<MessageSummary>) {
81        self.compressed_messages = compressed;
82        self.message_summaries = summaries;
83    }
84
85    /// Get the session name (user-defined or fallback).
86    pub fn name(&self) -> Option<&str> {
87        self.metadata.name.as_deref()
88    }
89
90    /// Migrate legacy messages field to full_messages.
91    pub fn migrate_legacy(&mut self) {
92        if !self.messages.is_empty() && self.full_messages.is_empty() {
93            log::info!(
94                "Migrating legacy session: {} messages -> full_messages",
95                self.messages.len()
96            );
97            self.full_messages = self.messages.clone();
98            self.message_summaries = self
99                .messages
100                .iter()
101                .enumerate()
102                .map(|(i, m)| MessageSummary::from_message(m, i))
103                .collect();
104            self.messages.clear();
105            log::info!(
106                "Migration complete: full_messages={}, summaries={}",
107                self.full_messages.len(),
108                self.message_summaries.len()
109            );
110        }
111    }
112}
113
114/// File lock for preventing concurrent access to session storage.
115pub struct SessionFileLock {
116    lock_path: PathBuf,
117    locked: bool,
118}
119
120impl SessionFileLock {
121    pub fn new(base_dir: &Path) -> Self {
122        Self {
123            lock_path: base_dir.join("sessions.lock"),
124            locked: false,
125        }
126    }
127
128    /// Acquire the lock (blocking with timeout).
129    pub fn acquire(&mut self, timeout_ms: u64) -> anyhow::Result<()> {
130        if self.locked {
131            return Ok(());
132        }
133
134        let start = std::time::Instant::now();
135
136        while start.elapsed().as_millis() < timeout_ms as u128 {
137            match std::fs::File::create_new(&self.lock_path) {
138                Ok(_) => {
139                    let lock_info = format!("{}:{}", std::process::id(), Utc::now().to_rfc3339());
140                    std::fs::write(&self.lock_path, lock_info)?;
141                    self.locked = true;
142                    return Ok(());
143                }
144                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
145                    if self.is_stale_lock()? {
146                        self.remove_stale_lock()?;
147                    }
148                    std::thread::sleep(std::time::Duration::from_millis(50));
149                }
150                Err(e) => {
151                    return Err(e.into());
152                }
153            }
154        }
155
156        anyhow::bail!(
157            "Failed to acquire session lock after {}ms timeout",
158            timeout_ms
159        )
160    }
161
162    fn is_stale_lock(&self) -> anyhow::Result<bool> {
163        if !self.lock_path.exists() {
164            return Ok(false);
165        }
166
167        if let Ok(content) = std::fs::read_to_string(&self.lock_path)
168            && let Some(pid_str) = content.split(':').next()
169            && let Ok(pid) = pid_str.parse::<u32>()
170            && !self.is_process_running(pid)
171        {
172            return Ok(true);
173        }
174
175        let metadata = std::fs::metadata(&self.lock_path)?;
176        let modified = metadata.modified()?;
177        let age = std::time::SystemTime::now()
178            .duration_since(modified)
179            .unwrap_or(std::time::Duration::ZERO);
180
181        Ok(age > std::time::Duration::from_secs(60))
182    }
183
184    fn is_process_running(&self, pid: u32) -> bool {
185        #[cfg(unix)]
186        {
187            std::path::Path::new(&format!("/proc/{}", pid)).exists()
188        }
189        #[cfg(windows)]
190        {
191            use std::process::Command;
192            let output = Command::new("tasklist")
193                .args(["/FI", &format!("PID eq {}", pid), "/NH"])
194                .output();
195
196            match output {
197                Ok(out) => {
198                    let stdout = String::from_utf8_lossy(&out.stdout);
199                    stdout.contains(&pid.to_string()) && !stdout.contains("No tasks")
200                }
201                Err(_) => true,
202            }
203        }
204    }
205
206    fn remove_stale_lock(&self) -> anyhow::Result<()> {
207        if self.lock_path.exists() {
208            std::fs::remove_file(&self.lock_path)?;
209        }
210        Ok(())
211    }
212
213    pub fn release(&mut self) -> anyhow::Result<()> {
214        if self.locked {
215            std::fs::remove_file(&self.lock_path)?;
216            self.locked = false;
217        }
218        Ok(())
219    }
220}
221
222impl Drop for SessionFileLock {
223    fn drop(&mut self) {
224        let _ = self.release();
225    }
226}