Skip to main content

atuin_client/
meta.rs

1use std::path::Path;
2use std::str::FromStr;
3use std::time::Duration;
4
5use atuin_common::record::HostId;
6use eyre::{Result, eyre};
7use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqlitePoolOptions};
8use time::{OffsetDateTime, format_description::well_known::Rfc3339};
9use tokio::sync::OnceCell;
10use uuid::Uuid;
11
12// Filenames for the legacy plain-text files that we migrate from.
13const LEGACY_HOST_ID_FILENAME: &str = "host_id";
14const LEGACY_LAST_SYNC_FILENAME: &str = "last_sync_time";
15const LEGACY_LAST_VERSION_CHECK_FILENAME: &str = "last_version_check_time";
16const LEGACY_LATEST_VERSION_FILENAME: &str = "latest_version";
17const LEGACY_SESSION_FILENAME: &str = "session";
18
19const KEY_HOST_ID: &str = "host_id";
20const KEY_LAST_SYNC: &str = "last_sync_time";
21const KEY_LAST_VERSION_CHECK: &str = "last_version_check_time";
22const KEY_LATEST_VERSION: &str = "latest_version";
23const KEY_SESSION: &str = "session";
24const KEY_HUB_SESSION: &str = "hub_session";
25const KEY_FILES_MIGRATED: &str = "files_migrated";
26
27pub struct MetaStore {
28    pool: SqlitePool,
29    cached_host_id: OnceCell<HostId>,
30}
31
32impl MetaStore {
33    pub async fn new(path: impl AsRef<Path>, timeout: f64) -> Result<Self> {
34        let path = path.as_ref();
35        let path_str = path
36            .as_os_str()
37            .to_str()
38            .ok_or_else(|| eyre!("meta database path is not valid UTF-8: {path:?}"))?;
39        debug!("opening meta sqlite database at {path:?}");
40
41        let is_memory = path_str.contains(":memory:");
42
43        if !is_memory
44            && !path.exists()
45            && let Some(dir) = path.parent()
46        {
47            fs_err::create_dir_all(dir)?;
48        }
49
50        // Use DELETE journal mode instead of WAL. This is a small, infrequently-
51        // written KV store — WAL's concurrency benefits aren't needed, and DELETE
52        // mode avoids creating auxiliary -wal/-shm files that complicate
53        // permission handling.
54        let opts = SqliteConnectOptions::from_str(path_str)?
55            .journal_mode(SqliteJournalMode::Delete)
56            .optimize_on_close(true, None)
57            .create_if_missing(true);
58
59        let pool = SqlitePoolOptions::new()
60            .acquire_timeout(Duration::from_secs_f64(timeout))
61            .connect_with(opts)
62            .await?;
63
64        sqlx::migrate!("./meta-migrations").run(&pool).await?;
65
66        // Session tokens are stored in this database, so restrict permissions.
67        #[cfg(unix)]
68        if !is_memory {
69            use std::os::unix::fs::PermissionsExt;
70            std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
71        }
72
73        let store = Self {
74            pool,
75            cached_host_id: OnceCell::const_new(),
76        };
77
78        if !is_memory {
79            store.migrate_files().await?;
80        }
81
82        Ok(store)
83    }
84
85    // Generic key-value operations
86
87    pub async fn get(&self, key: &str) -> Result<Option<String>> {
88        let row: Option<(String,)> = sqlx::query_as("SELECT value FROM meta WHERE key = ?1")
89            .bind(key)
90            .fetch_optional(&self.pool)
91            .await?;
92
93        Ok(row.map(|r| r.0))
94    }
95
96    pub async fn set(&self, key: &str, value: &str) -> Result<()> {
97        sqlx::query(
98            "INSERT INTO meta (key, value, updated_at) VALUES (?1, ?2, strftime('%s', 'now'))
99             ON CONFLICT(key) DO UPDATE SET value = ?2, updated_at = strftime('%s', 'now')",
100        )
101        .bind(key)
102        .bind(value)
103        .execute(&self.pool)
104        .await?;
105
106        Ok(())
107    }
108
109    pub async fn delete(&self, key: &str) -> Result<()> {
110        sqlx::query("DELETE FROM meta WHERE key = ?1")
111            .bind(key)
112            .execute(&self.pool)
113            .await?;
114
115        Ok(())
116    }
117
118    // Typed accessors
119
120    pub async fn host_id(&self) -> Result<HostId> {
121        self.cached_host_id
122            .get_or_try_init(|| async {
123                if let Some(id) = self.get(KEY_HOST_ID).await? {
124                    let parsed = Uuid::from_str(id.as_str())
125                        .map_err(|e| eyre!("failed to parse host ID: {e}"))?;
126                    return Ok(HostId(parsed));
127                }
128
129                let uuid = atuin_common::utils::uuid_v7();
130                self.set(KEY_HOST_ID, uuid.as_simple().to_string().as_ref())
131                    .await?;
132
133                Ok(HostId(uuid))
134            })
135            .await
136            .copied()
137    }
138
139    pub async fn last_sync(&self) -> Result<OffsetDateTime> {
140        match self.get(KEY_LAST_SYNC).await? {
141            Some(v) => Ok(OffsetDateTime::parse(v.as_str(), &Rfc3339)?),
142            None => Ok(OffsetDateTime::UNIX_EPOCH),
143        }
144    }
145
146    pub async fn save_sync_time(&self) -> Result<()> {
147        self.set(
148            KEY_LAST_SYNC,
149            OffsetDateTime::now_utc().format(&Rfc3339)?.as_str(),
150        )
151        .await
152    }
153
154    pub async fn last_version_check(&self) -> Result<OffsetDateTime> {
155        match self.get(KEY_LAST_VERSION_CHECK).await? {
156            Some(v) => Ok(OffsetDateTime::parse(v.as_str(), &Rfc3339)?),
157            None => Ok(OffsetDateTime::UNIX_EPOCH),
158        }
159    }
160
161    pub async fn save_version_check_time(&self) -> Result<()> {
162        self.set(
163            KEY_LAST_VERSION_CHECK,
164            OffsetDateTime::now_utc().format(&Rfc3339)?.as_str(),
165        )
166        .await
167    }
168
169    pub async fn latest_version(&self) -> Result<Option<String>> {
170        self.get(KEY_LATEST_VERSION).await
171    }
172
173    pub async fn save_latest_version(&self, version: &str) -> Result<()> {
174        self.set(KEY_LATEST_VERSION, version).await
175    }
176
177    pub async fn session_token(&self) -> Result<Option<String>> {
178        self.get(KEY_SESSION).await
179    }
180
181    pub async fn save_session(&self, token: &str) -> Result<()> {
182        self.set(KEY_SESSION, token).await
183    }
184
185    pub async fn delete_session(&self) -> Result<()> {
186        self.delete(KEY_SESSION).await
187    }
188
189    pub async fn logged_in(&self) -> Result<bool> {
190        Ok(self.session_token().await?.is_some() || self.hub_session_token().await?.is_some())
191    }
192
193    // Hub session methods (separate from sync session, used for Hub-specific features like AI)
194
195    pub async fn hub_session_token(&self) -> Result<Option<String>> {
196        self.get(KEY_HUB_SESSION).await
197    }
198
199    pub async fn save_hub_session(&self, token: &str) -> Result<()> {
200        self.set(KEY_HUB_SESSION, token).await
201    }
202
203    pub async fn delete_hub_session(&self) -> Result<()> {
204        self.delete(KEY_HUB_SESSION).await
205    }
206
207    pub async fn hub_logged_in(&self) -> Result<bool> {
208        Ok(self.hub_session_token().await?.is_some())
209    }
210
211    // File migration: on first open, migrate old plain-text files into the database.
212    // Old files are left in place for safe downgrades.
213
214    async fn migrate_files(&self) -> Result<()> {
215        if self.get(KEY_FILES_MIGRATED).await?.is_some() {
216            return Ok(());
217        }
218
219        let data_dir = crate::settings::Settings::effective_data_dir();
220
221        // host_id — validate as UUID
222        let host_id_path = data_dir.join(LEGACY_HOST_ID_FILENAME);
223        if host_id_path.exists()
224            && let Ok(value) = fs_err::read_to_string(&host_id_path)
225        {
226            let value = value.trim();
227            if !value.is_empty() {
228                if Uuid::from_str(value).is_ok() {
229                    self.set(KEY_HOST_ID, value).await?;
230                } else {
231                    warn!("skipping migration of host_id: invalid UUID {value:?}");
232                }
233            }
234        }
235
236        // last_sync_time — validate as RFC3339
237        let sync_path = data_dir.join(LEGACY_LAST_SYNC_FILENAME);
238        if sync_path.exists()
239            && let Ok(value) = fs_err::read_to_string(&sync_path)
240        {
241            let value = value.trim();
242            if !value.is_empty() {
243                if OffsetDateTime::parse(value, &Rfc3339).is_ok() {
244                    self.set(KEY_LAST_SYNC, value).await?;
245                } else {
246                    warn!("skipping migration of last_sync_time: invalid RFC3339 {value:?}");
247                }
248            }
249        }
250
251        // last_version_check_time — validate as RFC3339
252        let version_check_path = data_dir.join(LEGACY_LAST_VERSION_CHECK_FILENAME);
253        if version_check_path.exists()
254            && let Ok(value) = fs_err::read_to_string(&version_check_path)
255        {
256            let value = value.trim();
257            if !value.is_empty() {
258                if OffsetDateTime::parse(value, &Rfc3339).is_ok() {
259                    self.set(KEY_LAST_VERSION_CHECK, value).await?;
260                } else {
261                    warn!(
262                        "skipping migration of last_version_check_time: invalid RFC3339 {value:?}"
263                    );
264                }
265            }
266        }
267
268        // latest_version — no strict validation, just non-empty
269        let latest_version_path = data_dir.join(LEGACY_LATEST_VERSION_FILENAME);
270        if latest_version_path.exists()
271            && let Ok(value) = fs_err::read_to_string(&latest_version_path)
272        {
273            let value = value.trim();
274            if !value.is_empty() {
275                self.set(KEY_LATEST_VERSION, value).await?;
276            }
277        }
278
279        // session token — no strict validation, just non-empty
280        let session_path = data_dir.join(LEGACY_SESSION_FILENAME);
281        if session_path.exists()
282            && let Ok(value) = fs_err::read_to_string(&session_path)
283        {
284            let value = value.trim();
285            if !value.is_empty() {
286                self.set(KEY_SESSION, value).await?;
287            }
288        }
289
290        self.set(KEY_FILES_MIGRATED, "true").await?;
291
292        Ok(())
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299
300    async fn new_test_store() -> MetaStore {
301        MetaStore::new("sqlite::memory:", 2.0).await.unwrap()
302    }
303
304    #[tokio::test]
305    async fn test_get_set_delete() {
306        let store = new_test_store().await;
307
308        assert_eq!(store.get("foo").await.unwrap(), None);
309
310        store.set("foo", "bar").await.unwrap();
311        assert_eq!(store.get("foo").await.unwrap(), Some("bar".to_string()));
312
313        store.set("foo", "baz").await.unwrap();
314        assert_eq!(store.get("foo").await.unwrap(), Some("baz".to_string()));
315
316        store.delete("foo").await.unwrap();
317        assert_eq!(store.get("foo").await.unwrap(), None);
318    }
319
320    #[tokio::test]
321    async fn test_host_id_generation_and_stability() {
322        let store = new_test_store().await;
323
324        let id1 = store.host_id().await.unwrap();
325        let id2 = store.host_id().await.unwrap();
326
327        assert_eq!(id1, id2, "host_id should be stable across calls");
328    }
329
330    #[tokio::test]
331    async fn test_sync_time() {
332        let store = new_test_store().await;
333
334        let t = store.last_sync().await.unwrap();
335        assert_eq!(t, OffsetDateTime::UNIX_EPOCH);
336
337        store.save_sync_time().await.unwrap();
338        let t = store.last_sync().await.unwrap();
339        assert!(t > OffsetDateTime::UNIX_EPOCH);
340    }
341
342    #[tokio::test]
343    async fn test_version_check_time() {
344        let store = new_test_store().await;
345
346        let t = store.last_version_check().await.unwrap();
347        assert_eq!(t, OffsetDateTime::UNIX_EPOCH);
348
349        store.save_version_check_time().await.unwrap();
350        let t = store.last_version_check().await.unwrap();
351        assert!(t > OffsetDateTime::UNIX_EPOCH);
352    }
353
354    #[tokio::test]
355    async fn test_session_crud() {
356        let store = new_test_store().await;
357
358        assert!(!store.logged_in().await.unwrap());
359        assert_eq!(store.session_token().await.unwrap(), None);
360
361        store.save_session("tok123").await.unwrap();
362        assert!(store.logged_in().await.unwrap());
363        assert_eq!(
364            store.session_token().await.unwrap(),
365            Some("tok123".to_string())
366        );
367
368        store.delete_session().await.unwrap();
369        assert!(!store.logged_in().await.unwrap());
370    }
371
372    #[tokio::test]
373    async fn test_latest_version() {
374        let store = new_test_store().await;
375
376        assert_eq!(store.latest_version().await.unwrap(), None);
377
378        store.save_latest_version("1.2.3").await.unwrap();
379        assert_eq!(
380            store.latest_version().await.unwrap(),
381            Some("1.2.3".to_string())
382        );
383    }
384}