Skip to main content

magic_bird/store/
sessions.rs

1//! Session storage operations.
2
3use std::fs;
4
5use duckdb::params;
6
7use super::atomic;
8use super::Store;
9use crate::config::StorageMode;
10use crate::schema::SessionRecord;
11use crate::Result;
12
13impl Store {
14    /// Write a session record to the store.
15    ///
16    /// Behavior depends on storage mode:
17    /// - Parquet: Creates a new Parquet file in the appropriate date partition
18    /// - DuckDB: Inserts directly into the local.sessions
19    ///
20    /// Sessions are written lazily on first invocation from that session.
21    pub fn write_session(&self, record: &SessionRecord) -> Result<()> {
22        match self.config.storage_mode {
23            StorageMode::Parquet => self.write_session_parquet(record),
24            StorageMode::DuckDB => self.write_session_duckdb(record),
25        }
26    }
27
28    /// Write session to a Parquet file (multi-writer safe).
29    fn write_session_parquet(&self, record: &SessionRecord) -> Result<()> {
30        let conn = self.connection()?;
31
32        // Ensure the partition directory exists
33        let partition_dir = self.config.sessions_dir(&record.date);
34        fs::create_dir_all(&partition_dir)?;
35
36        // Generate filename: {session_id}.parquet
37        let filename = format!("{}.parquet", record.session_id);
38        let file_path = partition_dir.join(&filename);
39
40        // Write via DuckDB using COPY
41        conn.execute_batch(
42            r#"
43            CREATE OR REPLACE TEMP TABLE temp_session (
44                session_id VARCHAR,
45                client_id VARCHAR,
46                invoker VARCHAR,
47                invoker_pid INTEGER,
48                invoker_type VARCHAR,
49                registered_at TIMESTAMP,
50                cwd VARCHAR,
51                date DATE
52            );
53            "#,
54        )?;
55
56        conn.execute(
57            r#"
58            INSERT INTO temp_session VALUES (
59                ?, ?, ?, ?, ?, ?, ?, ?
60            )
61            "#,
62            params![
63                record.session_id,
64                record.client_id,
65                record.invoker,
66                record.invoker_pid,
67                record.invoker_type,
68                record.registered_at.to_rfc3339(),
69                record.cwd,
70                record.date.to_string(),
71            ],
72        )?;
73
74        // Atomic write: COPY to temp file, then rename
75        let temp_path = atomic::temp_path(&file_path);
76        conn.execute(
77            &format!(
78                "COPY temp_session TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
79                temp_path.display()
80            ),
81            [],
82        )?;
83        conn.execute("DROP TABLE temp_session", [])?;
84
85        // Rename temp to final (atomic on POSIX)
86        atomic::rename_into_place(&temp_path, &file_path)?;
87
88        Ok(())
89    }
90
91    /// Write session directly to DuckDB table.
92    fn write_session_duckdb(&self, record: &SessionRecord) -> Result<()> {
93        let conn = self.connection()?;
94
95        conn.execute(
96            r#"
97            INSERT INTO local.sessions VALUES (
98                ?, ?, ?, ?, ?, ?, ?, ?
99            )
100            "#,
101            params![
102                record.session_id,
103                record.client_id,
104                record.invoker,
105                record.invoker_pid,
106                record.invoker_type,
107                record.registered_at.to_rfc3339(),
108                record.cwd,
109                record.date.to_string(),
110            ],
111        )?;
112
113        Ok(())
114    }
115
116    /// Check if a session exists in the store.
117    pub fn session_exists(&self, session_id: &str) -> Result<bool> {
118        let conn = self.connection()?;
119
120        let result: std::result::Result<i64, _> = conn.query_row(
121            &format!(
122                "SELECT COUNT(*) FROM sessions WHERE session_id = '{}'",
123                session_id
124            ),
125            [],
126            |row| row.get(0),
127        );
128
129        match result {
130            Ok(count) => Ok(count > 0),
131            Err(e) => {
132                if e.to_string().contains("No files found") {
133                    Ok(false)
134                } else {
135                    Err(e.into())
136                }
137            }
138        }
139    }
140
141    /// Ensure a session is registered, creating it if needed.
142    ///
143    /// This is called lazily when an invocation is recorded. If the session
144    /// doesn't exist, it creates a new session record.
145    pub fn ensure_session(&self, record: &SessionRecord) -> Result<()> {
146        if !self.session_exists(&record.session_id)? {
147            self.write_session(record)?;
148        }
149        Ok(())
150    }
151
152    /// Count total sessions in the store.
153    pub fn session_count(&self) -> Result<i64> {
154        let conn = self.connection()?;
155
156        let result: std::result::Result<i64, _> =
157            conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0));
158
159        match result {
160            Ok(count) => Ok(count),
161            Err(e) => {
162                if e.to_string().contains("No files found") {
163                    Ok(0)
164                } else {
165                    Err(e.into())
166                }
167            }
168        }
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use crate::init::initialize;
176    use crate::Config;
177    use tempfile::TempDir;
178
179    fn setup_store() -> (TempDir, Store) {
180        let tmp = TempDir::new().unwrap();
181        let config = Config::with_root(tmp.path());
182        initialize(&config).unwrap();
183        let store = Store::open(config).unwrap();
184        (tmp, store)
185    }
186
187    #[test]
188    fn test_write_session() {
189        let (_tmp, store) = setup_store();
190
191        let record = SessionRecord::new(
192            "zsh-12345",
193            "user@laptop",
194            "zsh",
195            12345,
196            "shell",
197        );
198
199        store.write_session(&record).unwrap();
200
201        let count = store.session_count().unwrap();
202        assert_eq!(count, 1);
203    }
204
205    #[test]
206    fn test_session_exists() {
207        let (_tmp, store) = setup_store();
208
209        // Should not exist initially
210        assert!(!store.session_exists("zsh-12345").unwrap());
211
212        // Write session
213        let record = SessionRecord::new(
214            "zsh-12345",
215            "user@laptop",
216            "zsh",
217            12345,
218            "shell",
219        );
220        store.write_session(&record).unwrap();
221
222        // Should exist now
223        assert!(store.session_exists("zsh-12345").unwrap());
224    }
225
226    #[test]
227    fn test_ensure_session_creates_new() {
228        let (_tmp, store) = setup_store();
229
230        let record = SessionRecord::new(
231            "bash-67890",
232            "user@laptop",
233            "bash",
234            67890,
235            "shell",
236        );
237
238        // Should create session
239        store.ensure_session(&record).unwrap();
240
241        assert!(store.session_exists("bash-67890").unwrap());
242        assert_eq!(store.session_count().unwrap(), 1);
243    }
244
245    #[test]
246    fn test_ensure_session_idempotent() {
247        let (_tmp, store) = setup_store();
248
249        let record = SessionRecord::new(
250            "zsh-11111",
251            "user@laptop",
252            "zsh",
253            11111,
254            "shell",
255        );
256
257        // Call ensure_session twice
258        store.ensure_session(&record).unwrap();
259        store.ensure_session(&record).unwrap();
260
261        // Should still only have one session
262        assert_eq!(store.session_count().unwrap(), 1);
263    }
264
265    #[test]
266    fn test_session_count_empty() {
267        let (_tmp, store) = setup_store();
268
269        let count = store.session_count().unwrap();
270        assert_eq!(count, 0);
271    }
272}