1use std::fs;
4
5use duckdb::params;
6
7use super::atomic;
8use super::Store;
9use crate::config::StorageMode;
10use crate::schema::SessionRecord;
11use crate::Result;
12
13impl Store {
14 pub fn write_session(&self, record: &SessionRecord) -> Result<()> {
22 match self.config.storage_mode {
23 StorageMode::Parquet => self.write_session_parquet(record),
24 StorageMode::DuckDB => self.write_session_duckdb(record),
25 }
26 }
27
28 fn write_session_parquet(&self, record: &SessionRecord) -> Result<()> {
30 let conn = self.connection_with_options(false)?;
31
32 let partition_dir = self.config.sessions_dir(&record.date);
34 fs::create_dir_all(&partition_dir)?;
35
36 let filename = format!("{}.parquet", record.session_id);
38 let file_path = partition_dir.join(&filename);
39
40 conn.execute_batch(
42 r#"
43 CREATE OR REPLACE TEMP TABLE temp_session (
44 session_id VARCHAR,
45 client_id VARCHAR,
46 invoker VARCHAR,
47 invoker_pid INTEGER,
48 invoker_type VARCHAR,
49 registered_at TIMESTAMP,
50 cwd VARCHAR,
51 date DATE
52 );
53 "#,
54 )?;
55
56 conn.execute(
57 r#"
58 INSERT INTO temp_session VALUES (
59 ?, ?, ?, ?, ?, ?, ?, ?
60 )
61 "#,
62 params![
63 record.session_id,
64 record.client_id,
65 record.invoker,
66 record.invoker_pid,
67 record.invoker_type,
68 record.registered_at.to_rfc3339(),
69 record.cwd,
70 record.date.to_string(),
71 ],
72 )?;
73
74 let temp_path = atomic::temp_path(&file_path);
76 conn.execute(
77 &format!(
78 "COPY temp_session TO '{}' (FORMAT PARQUET, COMPRESSION ZSTD)",
79 temp_path.display()
80 ),
81 [],
82 )?;
83 conn.execute("DROP TABLE temp_session", [])?;
84
85 atomic::rename_into_place(&temp_path, &file_path)?;
87
88 Ok(())
89 }
90
91 fn write_session_duckdb(&self, record: &SessionRecord) -> Result<()> {
93 let conn = self.connection()?;
94
95 conn.execute(
96 r#"
97 INSERT INTO local.sessions VALUES (
98 ?, ?, ?, ?, ?, ?, ?, ?
99 )
100 "#,
101 params![
102 record.session_id,
103 record.client_id,
104 record.invoker,
105 record.invoker_pid,
106 record.invoker_type,
107 record.registered_at.to_rfc3339(),
108 record.cwd,
109 record.date.to_string(),
110 ],
111 )?;
112
113 Ok(())
114 }
115
116 pub fn session_exists(&self, session_id: &str) -> Result<bool> {
118 let conn = self.connection()?;
119
120 let result: std::result::Result<i64, _> = conn.query_row(
121 &format!(
122 "SELECT COUNT(*) FROM sessions WHERE session_id = '{}'",
123 session_id
124 ),
125 [],
126 |row| row.get(0),
127 );
128
129 match result {
130 Ok(count) => Ok(count > 0),
131 Err(e) => {
132 if e.to_string().contains("No files found") {
133 Ok(false)
134 } else {
135 Err(e.into())
136 }
137 }
138 }
139 }
140
141 pub fn ensure_session(&self, record: &SessionRecord) -> Result<()> {
146 if !self.session_exists(&record.session_id)? {
147 self.write_session(record)?;
148 }
149 Ok(())
150 }
151
152 pub fn session_count(&self) -> Result<i64> {
154 let conn = self.connection()?;
155
156 let result: std::result::Result<i64, _> =
157 conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0));
158
159 match result {
160 Ok(count) => Ok(count),
161 Err(e) => {
162 if e.to_string().contains("No files found") {
163 Ok(0)
164 } else {
165 Err(e.into())
166 }
167 }
168 }
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use crate::init::initialize;
176 use crate::Config;
177 use tempfile::TempDir;
178
179 fn setup_store() -> (TempDir, Store) {
180 let tmp = TempDir::new().unwrap();
181 let config = Config::with_root(tmp.path());
182 initialize(&config).unwrap();
183 let store = Store::open(config).unwrap();
184 (tmp, store)
185 }
186
187 #[test]
188 fn test_write_session() {
189 let (_tmp, store) = setup_store();
190
191 let record = SessionRecord::new(
192 "zsh-12345",
193 "user@laptop",
194 "zsh",
195 12345,
196 "shell",
197 );
198
199 store.write_session(&record).unwrap();
200
201 let count = store.session_count().unwrap();
202 assert_eq!(count, 1);
203 }
204
205 #[test]
206 fn test_session_exists() {
207 let (_tmp, store) = setup_store();
208
209 assert!(!store.session_exists("zsh-12345").unwrap());
211
212 let record = SessionRecord::new(
214 "zsh-12345",
215 "user@laptop",
216 "zsh",
217 12345,
218 "shell",
219 );
220 store.write_session(&record).unwrap();
221
222 assert!(store.session_exists("zsh-12345").unwrap());
224 }
225
226 #[test]
227 fn test_ensure_session_creates_new() {
228 let (_tmp, store) = setup_store();
229
230 let record = SessionRecord::new(
231 "bash-67890",
232 "user@laptop",
233 "bash",
234 67890,
235 "shell",
236 );
237
238 store.ensure_session(&record).unwrap();
240
241 assert!(store.session_exists("bash-67890").unwrap());
242 assert_eq!(store.session_count().unwrap(), 1);
243 }
244
245 #[test]
246 fn test_ensure_session_idempotent() {
247 let (_tmp, store) = setup_store();
248
249 let record = SessionRecord::new(
250 "zsh-11111",
251 "user@laptop",
252 "zsh",
253 11111,
254 "shell",
255 );
256
257 store.ensure_session(&record).unwrap();
259 store.ensure_session(&record).unwrap();
260
261 assert_eq!(store.session_count().unwrap(), 1);
263 }
264
265 #[test]
266 fn test_session_count_empty() {
267 let (_tmp, store) = setup_store();
268
269 let count = store.session_count().unwrap();
270 assert_eq!(count, 0);
271 }
272}