aion_store_libsql/
connection.rs1use std::path::Path;
4use std::time::Duration;
5
6use aion_store::StoreError;
7
8use crate::config::{LibSqlConfig, LibSqlMode};
9
10pub struct OpenedConnection {
12 pub database: libsql::Database,
14 pub connection: libsql::Connection,
16}
17
18pub async fn open_connection(config: &LibSqlConfig) -> Result<OpenedConnection, StoreError> {
29 let opened = match &config.mode {
30 LibSqlMode::Embedded { path } => open_embedded(path).await?,
31 LibSqlMode::EmbeddedReplica {
32 path,
33 primary_url,
34 auth_token,
35 } => {
36 open_embedded_replica(
37 path,
38 primary_url.clone(),
39 auth_token.clone(),
40 config.sync_interval_seconds,
41 )
42 .await?
43 }
44 };
45
46 apply_pragmas(
47 &opened.connection,
48 config.journal_mode.as_deref(),
49 config.synchronous.as_deref(),
50 )
51 .await?;
52
53 Ok(opened)
54}
55
56async fn open_embedded(path: &Path) -> Result<OpenedConnection, StoreError> {
57 let database = libsql::Builder::new_local(path)
58 .build()
59 .await
60 .map_err(|error| crate::error::libsql_error(&error))?;
61
62 let connection = database
63 .connect()
64 .map_err(|error| crate::error::libsql_error(&error))?;
65
66 Ok(OpenedConnection {
67 database,
68 connection,
69 })
70}
71
72async fn open_embedded_replica(
73 path: &Path,
74 primary_url: String,
75 auth_token: String,
76 sync_interval_seconds: Option<u64>,
77) -> Result<OpenedConnection, StoreError> {
78 let mut builder = libsql::Builder::new_remote_replica(path, primary_url, auth_token);
79 if let Some(seconds) = sync_interval_seconds {
80 builder = builder.sync_interval(Duration::from_secs(seconds));
81 }
82
83 let db = builder
84 .build()
85 .await
86 .map_err(|error| crate::error::libsql_error(&error))?;
87
88 let connection = db
89 .connect()
90 .map_err(|error| crate::error::libsql_error(&error))?;
91
92 Ok(OpenedConnection {
93 database: db,
94 connection,
95 })
96}
97
98async fn apply_pragmas(
99 conn: &libsql::Connection,
100 journal_mode: Option<&str>,
101 synchronous: Option<&str>,
102) -> Result<(), StoreError> {
103 if let Some(value) = journal_mode {
104 execute_pragma(conn, "journal_mode", value).await?;
105 }
106
107 if let Some(value) = synchronous {
108 execute_pragma(conn, "synchronous", value).await?;
109 }
110
111 Ok(())
112}
113
114async fn execute_pragma(
115 conn: &libsql::Connection,
116 name: &str,
117 value: &str,
118) -> Result<(), StoreError> {
119 let value = validate_pragma_value(value)?;
120 let sql = format!("PRAGMA {name} = {value}");
121 conn.query(&sql, ())
122 .await
123 .map_err(|error| crate::error::libsql_error(&error))?;
124
125 Ok(())
126}
127
128fn validate_pragma_value(value: &str) -> Result<&str, StoreError> {
129 if !value.is_empty()
130 && value
131 .chars()
132 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
133 {
134 Ok(value)
135 } else {
136 Err(StoreError::Backend(format!(
137 "invalid libSQL PRAGMA value {value:?}"
138 )))
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use std::path::PathBuf;
145 use std::time::{SystemTime, UNIX_EPOCH};
146
147 use aion_store::StoreError;
148
149 use super::open_connection;
150 use crate::config::{LibSqlConfig, LibSqlMode};
151
152 #[tokio::test]
153 async fn opens_embedded_connection_and_queries() -> Result<(), StoreError> {
154 let config = LibSqlConfig {
155 mode: LibSqlMode::Embedded {
156 path: unique_temp_path("embedded-select"),
157 },
158 journal_mode: Some(String::from("wal")),
159 synchronous: Some(String::from("normal")),
160 sync_interval_seconds: None,
161 };
162
163 let opened = open_connection(&config).await?;
164 let conn = opened.connection;
165 let mut rows = conn
166 .query("SELECT 1", ())
167 .await
168 .map_err(|error| crate::error::libsql_error(&error))?;
169 let row = rows
170 .next()
171 .await
172 .map_err(|error| crate::error::libsql_error(&error))?
173 .ok_or_else(|| StoreError::Backend(String::from("SELECT 1 returned no rows")))?;
174 let value: i64 = row
175 .get(0)
176 .map_err(|error| crate::error::libsql_error(&error))?;
177
178 assert_eq!(value, 1);
179 Ok(())
180 }
181
182 #[tokio::test]
183 async fn maps_replica_open_failure_to_backend() -> Result<(), Box<dyn std::error::Error>> {
184 let config = LibSqlConfig {
185 mode: LibSqlMode::EmbeddedReplica {
186 path: unique_temp_path("replica-unavailable-primary"),
187 primary_url: String::from("http://127.0.0.1:9"),
188 auth_token: String::from("token"),
189 },
190 journal_mode: None,
191 synchronous: None,
192 sync_interval_seconds: Some(1),
193 };
194
195 match open_connection(&config).await {
196 Ok(_) => Err("expected embedded-replica open to fail for an invalid URL".into()),
197 Err(StoreError::Backend(_)) => Ok(()),
198 Err(other) => Err(format!("expected backend error, got {other:?}").into()),
199 }
200 }
201
202 fn unique_temp_path(name: &str) -> PathBuf {
203 let nanos = SystemTime::now()
204 .duration_since(UNIX_EPOCH)
205 .map_or(0, |duration| duration.as_nanos());
206 std::env::temp_dir().join(format!(
207 "aion-store-libsql-{name}-{}-{nanos}.db",
208 std::process::id()
209 ))
210 }
211}