Skip to main content

aion_store_libsql/
connection.rs

1//! Open embedded and embedded-replica libSQL connections.
2
3use std::path::Path;
4use std::time::Duration;
5
6use aion_store::StoreError;
7
8use crate::config::{LibSqlConfig, LibSqlMode};
9
10/// Opened libSQL database handle and its mode-agnostic connection.
11pub struct OpenedConnection {
12    /// Database handle used to create the connection and to trigger replica sync.
13    pub database: libsql::Database,
14    /// Connection used by the event-store implementation.
15    pub connection: libsql::Connection,
16}
17
18/// Open the configured libSQL database and return its handle and connection.
19///
20/// Operator-provided journal and synchronous settings are applied only when present on the
21/// configuration. This crate intentionally does not choose durability defaults for omitted
22/// tunables.
23///
24/// # Errors
25///
26/// Returns `StoreError::Backend` when libSQL cannot build/connect the database or when applying an
27/// explicitly configured PRAGMA fails.
28pub async fn open_connection(config: &LibSqlConfig) -> Result<OpenedConnection, StoreError> {
29    let opened = match &config.mode {
30        LibSqlMode::Embedded { path } => open_embedded(path).await?,
31        LibSqlMode::EmbeddedReplica {
32            path,
33            primary_url,
34            auth_token,
35        } => {
36            open_embedded_replica(
37                path,
38                primary_url.clone(),
39                auth_token.clone(),
40                config.sync_interval_seconds,
41            )
42            .await?
43        }
44    };
45
46    apply_pragmas(
47        &opened.connection,
48        config.journal_mode.as_deref(),
49        config.synchronous.as_deref(),
50    )
51    .await?;
52
53    Ok(opened)
54}
55
56async fn open_embedded(path: &Path) -> Result<OpenedConnection, StoreError> {
57    let database = libsql::Builder::new_local(path)
58        .build()
59        .await
60        .map_err(|error| crate::error::libsql_error(&error))?;
61
62    let connection = database
63        .connect()
64        .map_err(|error| crate::error::libsql_error(&error))?;
65
66    Ok(OpenedConnection {
67        database,
68        connection,
69    })
70}
71
72async fn open_embedded_replica(
73    path: &Path,
74    primary_url: String,
75    auth_token: String,
76    sync_interval_seconds: Option<u64>,
77) -> Result<OpenedConnection, StoreError> {
78    let mut builder = libsql::Builder::new_remote_replica(path, primary_url, auth_token);
79    if let Some(seconds) = sync_interval_seconds {
80        builder = builder.sync_interval(Duration::from_secs(seconds));
81    }
82
83    let db = builder
84        .build()
85        .await
86        .map_err(|error| crate::error::libsql_error(&error))?;
87
88    let connection = db
89        .connect()
90        .map_err(|error| crate::error::libsql_error(&error))?;
91
92    Ok(OpenedConnection {
93        database: db,
94        connection,
95    })
96}
97
98async fn apply_pragmas(
99    conn: &libsql::Connection,
100    journal_mode: Option<&str>,
101    synchronous: Option<&str>,
102) -> Result<(), StoreError> {
103    if let Some(value) = journal_mode {
104        execute_pragma(conn, "journal_mode", value).await?;
105    }
106
107    if let Some(value) = synchronous {
108        execute_pragma(conn, "synchronous", value).await?;
109    }
110
111    Ok(())
112}
113
114async fn execute_pragma(
115    conn: &libsql::Connection,
116    name: &str,
117    value: &str,
118) -> Result<(), StoreError> {
119    let value = validate_pragma_value(value)?;
120    let sql = format!("PRAGMA {name} = {value}");
121    conn.query(&sql, ())
122        .await
123        .map_err(|error| crate::error::libsql_error(&error))?;
124
125    Ok(())
126}
127
128fn validate_pragma_value(value: &str) -> Result<&str, StoreError> {
129    if !value.is_empty()
130        && value
131            .chars()
132            .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
133    {
134        Ok(value)
135    } else {
136        Err(StoreError::Backend(format!(
137            "invalid libSQL PRAGMA value {value:?}"
138        )))
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use std::path::PathBuf;
145    use std::time::{SystemTime, UNIX_EPOCH};
146
147    use aion_store::StoreError;
148
149    use super::open_connection;
150    use crate::config::{LibSqlConfig, LibSqlMode};
151
152    #[tokio::test]
153    async fn opens_embedded_connection_and_queries() -> Result<(), StoreError> {
154        let config = LibSqlConfig {
155            mode: LibSqlMode::Embedded {
156                path: unique_temp_path("embedded-select"),
157            },
158            journal_mode: Some(String::from("wal")),
159            synchronous: Some(String::from("normal")),
160            sync_interval_seconds: None,
161        };
162
163        let opened = open_connection(&config).await?;
164        let conn = opened.connection;
165        let mut rows = conn
166            .query("SELECT 1", ())
167            .await
168            .map_err(|error| crate::error::libsql_error(&error))?;
169        let row = rows
170            .next()
171            .await
172            .map_err(|error| crate::error::libsql_error(&error))?
173            .ok_or_else(|| StoreError::Backend(String::from("SELECT 1 returned no rows")))?;
174        let value: i64 = row
175            .get(0)
176            .map_err(|error| crate::error::libsql_error(&error))?;
177
178        assert_eq!(value, 1);
179        Ok(())
180    }
181
182    #[tokio::test]
183    async fn maps_replica_open_failure_to_backend() -> Result<(), Box<dyn std::error::Error>> {
184        let config = LibSqlConfig {
185            mode: LibSqlMode::EmbeddedReplica {
186                path: unique_temp_path("replica-unavailable-primary"),
187                primary_url: String::from("http://127.0.0.1:9"),
188                auth_token: String::from("token"),
189            },
190            journal_mode: None,
191            synchronous: None,
192            sync_interval_seconds: Some(1),
193        };
194
195        match open_connection(&config).await {
196            Ok(_) => Err("expected embedded-replica open to fail for an invalid URL".into()),
197            Err(StoreError::Backend(_)) => Ok(()),
198            Err(other) => Err(format!("expected backend error, got {other:?}").into()),
199        }
200    }
201
202    fn unique_temp_path(name: &str) -> PathBuf {
203        let nanos = SystemTime::now()
204            .duration_since(UNIX_EPOCH)
205            .map_or(0, |duration| duration.as_nanos());
206        std::env::temp_dir().join(format!(
207            "aion-store-libsql-{name}-{}-{nanos}.db",
208            std::process::id()
209        ))
210    }
211}