use std::{
future::Future,
io::ErrorKind,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
time::Duration,
};
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::{header::AUTHORIZATION, Request};
use hyper_tls::HttpsConnector;
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor,
};
use tokio::sync::mpsc;
use crate::{connection::Connection, Error, Result};
pub use turso_sync_sdk_kit::rsapi::DatabaseSyncStats;
pub use turso_sync_sdk_kit::rsapi::PartialBootstrapStrategy;
pub use turso_sync_sdk_kit::rsapi::PartialSyncOpts;
const DEFAULT_CLIENT_NAME: &str = "turso-sync-rust";
#[derive(Debug, Clone, Copy)]
pub enum RemoteEncryptionCipher {
Aes256Gcm,
Aes128Gcm,
ChaCha20Poly1305,
Aegis128L,
Aegis128X2,
Aegis128X4,
Aegis256,
Aegis256X2,
Aegis256X4,
}
impl RemoteEncryptionCipher {
pub fn reserved_bytes(&self) -> usize {
match self {
Self::Aes256Gcm | Self::Aes128Gcm | Self::ChaCha20Poly1305 => 28,
Self::Aegis128L | Self::Aegis128X2 | Self::Aegis128X4 => 32,
Self::Aegis256 | Self::Aegis256X2 | Self::Aegis256X4 => 48,
}
}
}
impl std::str::FromStr for RemoteEncryptionCipher {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"aes256gcm" | "aes-256-gcm" => Ok(Self::Aes256Gcm),
"aes128gcm" | "aes-128-gcm" => Ok(Self::Aes128Gcm),
"chacha20poly1305" | "chacha20-poly1305" => Ok(Self::ChaCha20Poly1305),
"aegis128l" | "aegis-128l" => Ok(Self::Aegis128L),
"aegis128x2" | "aegis-128x2" => Ok(Self::Aegis128X2),
"aegis128x4" | "aegis-128x4" => Ok(Self::Aegis128X4),
"aegis256" | "aegis-256" => Ok(Self::Aegis256),
"aegis256x2" | "aegis-256x2" => Ok(Self::Aegis256X2),
"aegis256x4" | "aegis-256x4" => Ok(Self::Aegis256X4),
_ => Err(format!(
"unknown cipher: '{s}'. Supported: aes256gcm, aes128gcm, chacha20poly1305, \
aegis128l, aegis128x2, aegis128x4, aegis256, aegis256x2, aegis256x4"
)),
}
}
}
pub struct Builder {
path: String,
remote_url: Option<String>,
auth_token: Option<String>,
client_name: Option<String>,
long_poll_timeout: Option<Duration>,
bootstrap_if_empty: bool,
partial_sync_config_experimental: Option<PartialSyncOpts>,
remote_encryption_key: Option<String>,
remote_encryption_cipher: Option<RemoteEncryptionCipher>,
}
impl Builder {
pub fn new_remote(path: &str) -> Self {
Self {
path: path.to_string(),
remote_url: None,
auth_token: None,
client_name: None,
long_poll_timeout: None,
bootstrap_if_empty: true,
partial_sync_config_experimental: None,
remote_encryption_key: None,
remote_encryption_cipher: None,
}
}
pub fn with_remote_url(mut self, remote_url: impl Into<String>) -> Self {
self.remote_url = Some(remote_url.into());
self
}
pub fn with_auth_token(mut self, token: impl Into<String>) -> Self {
self.auth_token = Some(token.into());
self
}
pub fn with_client_name(mut self, name: impl Into<String>) -> Self {
self.client_name = Some(name.into());
self
}
pub fn with_long_poll_timeout(mut self, timeout: Duration) -> Self {
self.long_poll_timeout = Some(timeout);
self
}
pub fn bootstrap_if_empty(mut self, enable: bool) -> Self {
self.bootstrap_if_empty = enable;
self
}
pub fn with_partial_sync_opts_experimental(mut self, opts: PartialSyncOpts) -> Self {
self.partial_sync_config_experimental = Some(opts);
self
}
pub fn with_remote_encryption(
mut self,
base64_key: impl Into<String>,
cipher: RemoteEncryptionCipher,
) -> Self {
self.remote_encryption_key = Some(base64_key.into());
self.remote_encryption_cipher = Some(cipher);
self
}
pub fn with_remote_encryption_key(mut self, base64_key: impl Into<String>) -> Self {
self.remote_encryption_key = Some(base64_key.into());
self
}
pub async fn build(self) -> Result<Database> {
let db_config = turso_sdk_kit::rsapi::TursoDatabaseConfig {
path: self.path.clone(),
experimental_features: None,
async_io: true,
encryption: None,
vfs: None,
io: None,
db_file: None,
};
let url = if let Some(remote_url) = &self.remote_url {
Some(normalize_base_url(remote_url).map_err(Error::Error)?)
} else {
None
};
let reserved_bytes = self
.remote_encryption_cipher
.map(|cipher| cipher.reserved_bytes());
let sync_config = turso_sync_sdk_kit::rsapi::TursoDatabaseSyncConfig {
path: self.path.clone(),
remote_url: url.clone(),
client_name: self
.client_name
.clone()
.unwrap_or_else(|| DEFAULT_CLIENT_NAME.to_string()),
long_poll_timeout_ms: self
.long_poll_timeout
.map(|d| d.as_millis().min(u32::MAX as u128) as u32),
bootstrap_if_empty: self.bootstrap_if_empty,
reserved_bytes,
partial_sync_opts: self.partial_sync_config_experimental.clone(),
remote_encryption_key: self.remote_encryption_key.clone(),
push_operations_threshold: None,
pull_bytes_threshold: None,
};
let sync =
turso_sync_sdk_kit::rsapi::TursoDatabaseSync::<Bytes>::new(db_config, sync_config)
.map_err(Error::from)?;
let io_worker = IoWorker::spawn(sync.clone(), url, self.auth_token.clone());
let op = sync.create();
drive_operation(op, io_worker.clone()).await?;
Ok(Database {
sync,
io: io_worker,
})
}
}
#[derive(Clone)]
pub struct Database {
sync: Arc<turso_sync_sdk_kit::rsapi::TursoDatabaseSync<Bytes>>,
io: Arc<IoWorker>,
}
impl Database {
pub async fn push(&self) -> Result<()> {
let op = self.sync.push_changes();
drive_operation(op, self.io.clone()).await?;
Ok(())
}
pub async fn pull(&self) -> Result<bool> {
let op = self.sync.wait_changes();
let result = drive_operation_result(op, self.io.clone()).await?;
let mut has_changes = false;
if let Some(
turso_sync_sdk_kit::turso_async_operation::TursoAsyncOperationResult::Changes {
changes,
},
) = result
{
if !changes.empty() {
has_changes = true;
let op_apply = self.sync.apply_changes(changes);
drive_operation(op_apply, self.io.clone()).await?;
}
}
Ok(has_changes)
}
pub async fn checkpoint(&self) -> Result<()> {
let op = self.sync.checkpoint();
drive_operation(op, self.io.clone()).await?;
Ok(())
}
pub async fn stats(&self) -> Result<DatabaseSyncStats> {
let op = self.sync.stats();
let result = drive_operation_result(op, self.io.clone()).await?;
match result {
Some(turso_sync_sdk_kit::turso_async_operation::TursoAsyncOperationResult::Stats {
stats,
}) => Ok(stats),
_ => Err(Error::Misuse(
"unexpected result type from stats operation".to_string(),
)),
}
}
pub async fn connect(&self) -> Result<Connection> {
let op = self.sync.connect();
let result = drive_operation_result(op, self.io.clone()).await?;
match result {
Some(
turso_sync_sdk_kit::turso_async_operation::TursoAsyncOperationResult::Connection {
connection,
},
) => {
let io = self.io.clone();
let extra_io = Arc::new(move |waker| {
io.register(waker);
io.kick();
Ok(())
});
Ok(Connection::create(connection, Some(extra_io)))
}
_ => Err(Error::Misuse(
"unexpected result type from connect operation".to_string(),
)),
}
}
}
async fn drive_operation(
op: Box<turso_sync_sdk_kit::turso_async_operation::TursoDatabaseAsyncOperation>,
io: Arc<IoWorker>,
) -> Result<()> {
let fut = AsyncOpFuture::new(op, io);
fut.await.map(|_| ())
}
async fn drive_operation_result(
op: Box<turso_sync_sdk_kit::turso_async_operation::TursoDatabaseAsyncOperation>,
io: Arc<IoWorker>,
) -> Result<Option<turso_sync_sdk_kit::turso_async_operation::TursoAsyncOperationResult>> {
let fut = AsyncOpFuture::new(op, io);
fut.await
}
struct AsyncOpFuture {
op: Option<Box<turso_sync_sdk_kit::turso_async_operation::TursoDatabaseAsyncOperation>>,
io: Arc<IoWorker>,
}
impl AsyncOpFuture {
fn new(
op: Box<turso_sync_sdk_kit::turso_async_operation::TursoDatabaseAsyncOperation>,
io: Arc<IoWorker>,
) -> Self {
Self { op: Some(op), io }
}
}
impl Future for AsyncOpFuture {
type Output =
Result<Option<turso_sync_sdk_kit::turso_async_operation::TursoAsyncOperationResult>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
let Some(op) = &this.op else {
return Poll::Ready(Err(Error::Misuse(
"operation future has been already completed".to_string(),
)));
};
this.io.register(cx.waker().clone());
match op.resume() {
Ok(turso_sdk_kit::rsapi::TursoStatusCode::Done) => {
let result = op.take_result().map(Some).or_else(|err| match err {
turso_sdk_kit::rsapi::TursoError::Misuse(msg)
if msg.contains("operation has no result") =>
{
Ok(None)
}
other => Err(Error::from(other)),
})?;
this.op.take();
Poll::Ready(Ok(result))
}
Ok(turso_sdk_kit::rsapi::TursoStatusCode::Io) => {
this.io.kick();
Poll::Pending
}
Ok(turso_sdk_kit::rsapi::TursoStatusCode::Row) => {
Poll::Ready(Err(Error::Misuse(
"unexpected row status in sync operation".to_string(),
)))
}
Err(e) => Poll::Ready(Err(Error::from(e))),
}
}
}
fn normalize_base_url(input: &str) -> std::result::Result<String, String> {
let s = input.trim();
let s = if let Some(rest) = s.strip_prefix("libsql://") {
format!("https://{rest}")
} else {
s.to_string()
};
if !(s.starts_with("https://") || s.starts_with("http://")) {
return Err(format!("unsupported remote URL scheme: {input}"));
}
let base = s.trim_end_matches('/').to_string();
Ok(base)
}
struct IoWorker {
sync: Arc<turso_sync_sdk_kit::rsapi::TursoDatabaseSync<Bytes>>,
base_url: Option<String>,
auth_token: Option<String>,
tx: mpsc::UnboundedSender<()>,
wakers: Arc<Mutex<Vec<Waker>>>,
}
impl IoWorker {
fn spawn(
sync: Arc<turso_sync_sdk_kit::rsapi::TursoDatabaseSync<Bytes>>,
base_url: Option<String>,
auth_token: Option<String>,
) -> Arc<Self> {
let (tx, rx) = mpsc::unbounded_channel::<()>();
let wakers = Arc::new(Mutex::new(Vec::new()));
let worker = Arc::new(Self {
sync,
base_url,
auth_token,
tx,
wakers: wakers.clone(),
});
let worker_clone = worker.clone();
std::thread::Builder::new()
.name("turso-sync-io".to_string())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build IO runtime");
rt.block_on(async move {
IoWorker::run_loop(worker_clone, rx, wakers).await;
});
})
.expect("failed to spawn IO worker thread");
worker
}
fn register(&self, waker: Waker) {
let mut wakers = self.wakers.lock().unwrap();
wakers.push(waker);
}
fn kick(&self) {
let _ = self.tx.send(());
}
fn notify_progress(wakers: &Arc<Mutex<Vec<Waker>>>) {
let wakers = {
let mut guard = wakers.lock().unwrap();
std::mem::take(&mut *guard)
};
for w in wakers {
w.wake();
}
}
async fn run_loop(
this: Arc<IoWorker>,
mut rx: mpsc::UnboundedReceiver<()>,
wakers: Arc<Mutex<Vec<Waker>>>,
) {
let mut http_connector = HttpConnector::new();
http_connector.enforce_http(false);
let https: HttpsConnector<HttpConnector> = HttpsConnector::new();
let client: Client<HttpsConnector<HttpConnector>, Full<Bytes>> =
Client::builder(TokioExecutor::new()).build::<_, Full<Bytes>>(https);
while rx.recv().await.is_some() {
let mut made_progress = false;
loop {
let item = this.sync.take_io_item();
let Some(item) = item else {
this.sync.step_io_callbacks();
IoWorker::notify_progress(&wakers);
break;
};
made_progress = true;
match item.get_request() {
turso_sync_sdk_kit::sync_engine_io::SyncEngineIoRequest::Http {
url,
method,
path,
body,
headers,
} => {
IoWorker::process_http(
&this,
&client,
url.as_deref(),
method,
path,
body.as_ref().map(|v| Bytes::from(v.clone())),
headers,
item.get_completion().clone(),
)
.await;
}
turso_sync_sdk_kit::sync_engine_io::SyncEngineIoRequest::FullRead { path } => {
IoWorker::process_full_read(
path,
item.get_completion().clone(),
&this.sync,
)
.await;
}
turso_sync_sdk_kit::sync_engine_io::SyncEngineIoRequest::FullWrite {
path,
content,
} => {
IoWorker::process_full_write(
path,
content,
item.get_completion().clone(),
&this.sync,
)
.await;
}
}
}
if made_progress {
this.sync.step_io_callbacks();
IoWorker::notify_progress(&wakers);
tokio::task::yield_now().await;
}
}
}
#[allow(clippy::too_many_arguments)]
async fn process_http(
this: &Arc<IoWorker>,
client: &Client<HttpsConnector<HttpConnector>, Full<Bytes>>,
url: Option<&str>,
method: &str,
path: &str,
body: Option<Bytes>,
headers: &[(String, String)],
completion: turso_sync_sdk_kit::sync_engine_io::SyncEngineIoCompletion<Bytes>,
) {
let full_url = if path.starts_with("http://") || path.starts_with("https://") {
path.to_string()
} else {
let p = if path.starts_with('/') {
path.to_string()
} else {
format!("/{path}")
};
let Some(url) = this.base_url.as_deref().or(url) else {
completion.poison("remote_url is not available".to_string());
return;
};
format!("{url}{p}")
};
let mut builder = Request::builder().method(method).uri(&full_url);
if let Some(headers_map) = builder.headers_mut() {
for (k, v) in headers {
if let Ok(name) = hyper::header::HeaderName::try_from(k.as_str()) {
if let Ok(value) = hyper::header::HeaderValue::try_from(v.as_str()) {
headers_map.insert(name, value);
}
}
}
if let Some(token) = &this.auth_token {
if !headers_map.contains_key(AUTHORIZATION) {
let value = format!("Bearer {token}");
if let Ok(hv) = hyper::header::HeaderValue::try_from(value.as_str()) {
headers_map.insert(AUTHORIZATION, hv);
}
}
}
}
let req_body = Full::new(body.unwrap_or_default());
let request = match builder.body(req_body) {
Ok(r) => r,
Err(err) => {
completion.poison(format!("failed to build request: {err}"));
this.sync.step_io_callbacks();
return;
}
};
let mut response = match client.request(request).await {
Ok(r) => r,
Err(err) => {
completion.poison(format!("http request failed: {err}"));
this.sync.step_io_callbacks();
return;
}
};
let status = response.status().as_u16();
completion.status(status as u32);
this.sync.step_io_callbacks();
IoWorker::notify_progress(&this.wakers);
while let Some(frame_res) = response.body_mut().frame().await {
match frame_res {
Ok(frame) => {
if let Some(chunk) = frame.data_ref() {
completion.push_buffer(chunk.clone());
this.sync.step_io_callbacks();
IoWorker::notify_progress(&this.wakers);
}
}
Err(err) => {
completion.poison(format!("error reading response body: {err}"));
this.sync.step_io_callbacks();
IoWorker::notify_progress(&this.wakers);
return;
}
}
}
completion.done();
this.sync.step_io_callbacks();
IoWorker::notify_progress(&this.wakers);
}
async fn process_full_read(
path: &str,
completion: turso_sync_sdk_kit::sync_engine_io::SyncEngineIoCompletion<Bytes>,
sync: &Arc<turso_sync_sdk_kit::rsapi::TursoDatabaseSync<Bytes>>,
) {
match tokio::fs::read(path).await {
Ok(content) => {
completion.push_buffer(Bytes::from(content));
completion.done();
}
Err(err) if err.kind() == ErrorKind::NotFound => completion.done(),
Err(err) => {
completion.poison(format!("full read failed for {path}: {err}"));
}
}
sync.step_io_callbacks();
}
async fn process_full_write(
path: &str,
content: &Vec<u8>,
completion: turso_sync_sdk_kit::sync_engine_io::SyncEngineIoCompletion<Bytes>,
sync: &Arc<turso_sync_sdk_kit::rsapi::TursoDatabaseSync<Bytes>>,
) {
match tokio::fs::write(path, content).await {
Ok(_) => {
completion.done();
}
Err(err) => {
completion.poison(format!("full write failed for {path}: {err}"));
}
}
sync.step_io_callbacks();
}
}
#[cfg(test)]
mod tests {
use anyhow::{anyhow, Context, Result};
use rand::{distr::Alphanumeric, Rng};
use reqwest::Client;
use serde_json::json;
use std::{
env,
process::{Child, Command, Stdio},
thread::sleep,
time::Duration,
};
use tempfile::TempDir;
use turso_sync_sdk_kit::rsapi::PartialBootstrapStrategy;
use crate::sync::PartialSyncOpts;
use crate::{Rows, Value};
const ADMIN_URL: &str = "http://localhost:8081";
const USER_URL: &str = "http://localhost:8080";
fn random_str() -> String {
rand::rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect()
}
async fn handle_response(resp: reqwest::Response) -> Result<()> {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
if status == 400 && text.contains("already exists") {
return Ok(());
}
if !status.is_success() {
return Err(anyhow!("request failed: {status} {text}"));
}
Ok(())
}
pub struct TursoServer {
user_url: String,
db_url: String,
host: String,
server: Option<Child>,
client: Client,
}
impl TursoServer {
pub async fn new() -> Result<Self> {
let client = Client::new();
if env::var("LOCAL_SYNC_SERVER").is_err() {
let name = random_str();
let tokens: Vec<&str> = USER_URL.split("://").collect();
handle_response(
client
.post(format!("{ADMIN_URL}/v1/tenants/{name}"))
.send()
.await?,
)
.await?;
handle_response(
client
.post(format!("{ADMIN_URL}/v1/tenants/{name}/groups/{name}"))
.send()
.await?,
)
.await?;
handle_response(
client
.post(format!(
"{ADMIN_URL}/v1/tenants/{name}/groups/{name}/databases/{name}"
))
.send()
.await?,
)
.await?;
Ok(Self {
user_url: USER_URL.to_string(),
db_url: format!("{}://{}--{}--{}.{}", tokens[0], name, name, name, tokens[1]),
host: format!("{name}--{name}--{name}.localhost"),
server: None,
client,
})
} else {
let port: u16 = rand::rng().random_range(10_000..=65_535);
let server_bin = env::var("LOCAL_SYNC_SERVER").unwrap();
let child = Command::new(server_bin)
.args(["--sync-server", &format!("0.0.0.0:{port}")])
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.context("failed to spawn local sync server")?;
let user_url = format!("http://localhost:{port}");
loop {
if client.get(&user_url).send().await.is_ok() {
break;
}
sleep(Duration::from_millis(100));
}
Ok(Self {
user_url: user_url.clone(),
db_url: user_url,
host: String::new(),
server: Some(child),
client,
})
}
}
pub fn db_url(&self) -> &str {
&self.db_url
}
pub async fn db_sql(&self, sql: &str) -> Result<Vec<Vec<Value>>> {
let resp = self
.client
.post(format!("{}/v2/pipeline", self.user_url))
.header("Host", &self.host)
.json(&json!({
"requests": [{
"type": "execute",
"stmt": { "sql": sql }
}]
}))
.send()
.await?
.error_for_status()?;
let value: serde_json::Value = resp.json().await?;
let result = &value["results"][0];
if result["type"] != "ok" {
return Err(anyhow!("remote sql execution failed: {value}"));
}
let rows = result["response"]["result"]["rows"]
.as_array()
.ok_or_else(|| anyhow!("invalid response shape"))?;
Ok(rows
.iter()
.map(|row| {
row.as_array()
.unwrap()
.iter()
.map(|cell| match cell["value"].clone() {
serde_json::Value::Null => Value::Null,
serde_json::Value::Number(number) => {
if number.is_i64() {
Value::Integer(number.as_i64().unwrap())
} else {
Value::Real(number.as_f64().unwrap())
}
}
serde_json::Value::String(s) => Value::Text(s),
_ => panic!("unexpected json output"),
})
.collect()
})
.collect())
}
}
impl Drop for TursoServer {
fn drop(&mut self) {
if let Some(child) = &mut self.server {
let _ = child.kill();
}
}
}
async fn all_rows(mut rows: Rows) -> Result<Vec<Vec<Value>>> {
let mut result = Vec::new();
while let Some(row) = rows.next().await? {
result.push(row.values.into_iter().map(|x| x.into()).collect());
}
Ok(result)
}
#[tokio::test]
pub async fn test_sync_bootstrap() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
server.db_sql("CREATE TABLE t(x)").await.unwrap();
server
.db_sql("INSERT INTO t VALUES ('hello'), ('turso'), ('sync')")
.await
.unwrap();
server.db_sql("SELECT * FROM t").await.unwrap();
let db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db.connect().await.unwrap();
let rows = conn.query("SELECT * FROM t", ()).await.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(
all,
vec![
vec![Value::Text("hello".to_string())],
vec![Value::Text("turso".to_string())],
vec![Value::Text("sync".to_string())],
]
);
}
#[tokio::test]
pub async fn test_sync_bootstrap_persistence() {
let _ = tracing_subscriber::fmt::try_init();
let dir = TempDir::new().unwrap();
let server = TursoServer::new().await.unwrap();
server.db_sql("CREATE TABLE t(x)").await.unwrap();
server
.db_sql("INSERT INTO t VALUES ('hello'), ('turso'), ('sync')")
.await
.unwrap();
server.db_sql("SELECT * FROM t").await.unwrap();
let db = crate::sync::Builder::new_remote(dir.path().join("local.db").to_str().unwrap())
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db.connect().await.unwrap();
let rows = conn.query("SELECT * FROM t", ()).await.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(
all,
vec![
vec![Value::Text("hello".to_string())],
vec![Value::Text("turso".to_string())],
vec![Value::Text("sync".to_string())],
]
);
}
#[tokio::test]
pub async fn test_sync_config_persistence() {
let _ = tracing_subscriber::fmt::try_init();
let dir = TempDir::new().unwrap();
let server = TursoServer::new().await.unwrap();
server.db_sql("CREATE TABLE t(x)").await.unwrap();
server.db_sql("INSERT INTO t VALUES (42)").await.unwrap();
{
let db1 =
crate::sync::Builder::new_remote(dir.path().join("local.db").to_str().unwrap())
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db1.connect().await.unwrap();
let rows = conn.query("SELECT * FROM t", ()).await.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(all, vec![vec![Value::Integer(42)],]);
}
server.db_sql("INSERT INTO t VALUES (41)").await.unwrap();
{
let db2 =
crate::sync::Builder::new_remote(dir.path().join("local.db").to_str().unwrap())
.build()
.await
.unwrap();
db2.pull().await.unwrap();
let conn = db2.connect().await.unwrap();
let rows = conn.query("SELECT * FROM t", ()).await.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(
all,
vec![vec![Value::Integer(42)], vec![Value::Integer(41)],]
);
}
}
#[tokio::test]
pub async fn test_sync_pull() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
server.db_sql("CREATE TABLE t(x)").await.unwrap();
server
.db_sql("INSERT INTO t VALUES ('hello'), ('turso'), ('sync')")
.await
.unwrap();
server.db_sql("SELECT * FROM t").await.unwrap();
let db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db.connect().await.unwrap();
let rows = conn.query("SELECT * FROM t", ()).await.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(
all,
vec![
vec![Value::Text("hello".to_string())],
vec![Value::Text("turso".to_string())],
vec![Value::Text("sync".to_string())],
]
);
server
.db_sql("INSERT INTO t VALUES ('pull works')")
.await
.unwrap();
let rows = conn.query("SELECT * FROM t", ()).await.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(
all,
vec![
vec![Value::Text("hello".to_string())],
vec![Value::Text("turso".to_string())],
vec![Value::Text("sync".to_string())],
]
);
db.pull().await.unwrap();
let rows = conn.query("SELECT * FROM t", ()).await.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(
all,
vec![
vec![Value::Text("hello".to_string())],
vec![Value::Text("turso".to_string())],
vec![Value::Text("sync".to_string())],
vec![Value::Text("pull works".to_string())],
]
);
}
#[tokio::test]
pub async fn test_sync_push() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
server.db_sql("CREATE TABLE t(x)").await.unwrap();
server
.db_sql("INSERT INTO t VALUES ('hello'), ('turso'), ('sync')")
.await
.unwrap();
server.db_sql("SELECT * FROM t").await.unwrap();
let db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db.connect().await.unwrap();
let rows = conn.query("SELECT * FROM t", ()).await.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(
all,
vec![
vec![Value::Text("hello".to_string())],
vec![Value::Text("turso".to_string())],
vec![Value::Text("sync".to_string())],
]
);
conn.execute("INSERT INTO t VALUES ('push works')", ())
.await
.unwrap();
let all = server.db_sql("SELECT * FROM t").await.unwrap();
assert_eq!(
all,
vec![
vec![Value::Text("hello".to_string())],
vec![Value::Text("turso".to_string())],
vec![Value::Text("sync".to_string())],
]
);
db.push().await.unwrap();
let rows = conn.query("SELECT * FROM t", ()).await.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(
all,
vec![
vec![Value::Text("hello".to_string())],
vec![Value::Text("turso".to_string())],
vec![Value::Text("sync".to_string())],
vec![Value::Text("push works".to_string())],
]
);
}
#[tokio::test]
pub async fn test_sync_checkpoint() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
let db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db.connect().await.unwrap();
conn.execute("CREATE TABLE t(x)", ()).await.unwrap();
for i in 0..1024 {
conn.execute("INSERT INTO t VALUES (?)", (i,))
.await
.unwrap();
}
let stats1 = db.stats().await.unwrap();
assert!(stats1.main_wal_size > 1024 * 1024);
db.checkpoint().await.unwrap();
let stats2 = db.stats().await.unwrap();
assert!(stats2.main_wal_size < 8 * 1024);
}
#[tokio::test]
pub async fn test_sync_partial() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
server.db_sql("CREATE TABLE t(x)").await.unwrap();
server
.db_sql("INSERT INTO t SELECT randomblob(1024) FROM generate_series(1, 2000)")
.await
.unwrap();
{
let full_db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = full_db.connect().await.unwrap();
let _ = all_rows(
conn.query("SELECT LENGTH(x) FROM t LIMIT 1", ())
.await
.unwrap(),
)
.await
.unwrap();
assert!(full_db.stats().await.unwrap().network_received_bytes > 2000 * 1024);
}
{
let partial_db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.with_partial_sync_opts_experimental(PartialSyncOpts {
bootstrap_strategy: Some(PartialBootstrapStrategy::Prefix {
length: 128 * 1024,
}),
segment_size: 128 * 1024,
prefetch: false,
})
.build()
.await
.unwrap();
let conn = partial_db.connect().await.unwrap();
let _ = all_rows(
conn.query("SELECT LENGTH(x) FROM t LIMIT 1", ())
.await
.unwrap(),
)
.await
.unwrap();
assert!(partial_db.stats().await.unwrap().network_received_bytes < 256 * (1024 + 10));
let before = tokio::time::Instant::now();
let all = all_rows(
conn.query("SELECT SUM(LENGTH(x)) FROM t", ())
.await
.unwrap(),
)
.await
.unwrap();
println!(
"duration: {:?}",
tokio::time::Instant::now().duration_since(before)
);
assert_eq!(all, vec![vec![Value::Integer(2000 * 1024)]]);
assert!(partial_db.stats().await.unwrap().network_received_bytes > 2000 * 1024);
}
}
#[tokio::test]
pub async fn test_sync_partial_segment_size() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
server.db_sql("CREATE TABLE t(x)").await.unwrap();
server
.db_sql("INSERT INTO t SELECT randomblob(1024) FROM generate_series(1, 256)")
.await
.unwrap();
{
let full_db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = full_db.connect().await.unwrap();
let _ = all_rows(
conn.query("SELECT LENGTH(x) FROM t LIMIT 1", ())
.await
.unwrap(),
)
.await
.unwrap();
assert!(full_db.stats().await.unwrap().network_received_bytes > 256 * 1024);
}
{
let partial_db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.with_partial_sync_opts_experimental(PartialSyncOpts {
bootstrap_strategy: Some(PartialBootstrapStrategy::Prefix {
length: 128 * 1024,
}),
segment_size: 4 * 1024,
prefetch: false,
})
.build()
.await
.unwrap();
let conn = partial_db.connect().await.unwrap();
let _ = all_rows(
conn.query("SELECT LENGTH(x) FROM t LIMIT 1", ())
.await
.unwrap(),
)
.await
.unwrap();
assert!(partial_db.stats().await.unwrap().network_received_bytes < 128 * 1024 * 3 / 2);
let before = tokio::time::Instant::now();
let all = all_rows(
conn.query("SELECT SUM(LENGTH(x)) FROM t", ())
.await
.unwrap(),
)
.await
.unwrap();
println!(
"duration segment size: {:?}",
tokio::time::Instant::now().duration_since(before)
);
assert_eq!(all, vec![vec![Value::Integer(256 * 1024)]]);
assert!(partial_db.stats().await.unwrap().network_received_bytes > 256 * 1024);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
pub async fn test_sync_partial_prefetch() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
server.db_sql("CREATE TABLE t(x)").await.unwrap();
server
.db_sql("INSERT INTO t SELECT randomblob(1024) FROM generate_series(1, 2000)")
.await
.unwrap();
{
let full_db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = full_db.connect().await.unwrap();
let _ = all_rows(
conn.query("SELECT LENGTH(x) FROM t LIMIT 1", ())
.await
.unwrap(),
)
.await
.unwrap();
assert!(full_db.stats().await.unwrap().network_received_bytes > 2000 * 1024);
}
{
let partial_db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.with_partial_sync_opts_experimental(PartialSyncOpts {
bootstrap_strategy: Some(PartialBootstrapStrategy::Prefix {
length: 128 * 1024,
}),
segment_size: 128 * 1024,
prefetch: true,
})
.build()
.await
.unwrap();
let conn = partial_db.connect().await.unwrap();
let _ = all_rows(
conn.query("SELECT LENGTH(x) FROM t LIMIT 1", ())
.await
.unwrap(),
)
.await
.unwrap();
assert!(partial_db.stats().await.unwrap().network_received_bytes < 1300 * (1024 + 10));
let before = tokio::time::Instant::now();
let all = all_rows(
conn.query("SELECT SUM(LENGTH(x)) FROM t", ())
.await
.unwrap(),
)
.await
.unwrap();
println!(
"duration prefetch: {:?}",
tokio::time::Instant::now().duration_since(before)
);
assert_eq!(all, vec![vec![Value::Integer(2000 * 1024)]]);
assert!(partial_db.stats().await.unwrap().network_received_bytes > 2000 * 1024);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
pub async fn test_sync_parallel_writes_with_sync_ops() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex as TokioMutex;
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
let db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db.connect().await.unwrap();
conn.execute(
"CREATE TABLE test_data (id INTEGER PRIMARY KEY AUTOINCREMENT, payload TEXT NOT NULL)",
(),
)
.await
.unwrap();
let payload = "X".repeat(200 * 1024);
let done = Arc::new(AtomicBool::new(false));
let sync_lock = Arc::new(TokioMutex::new(()));
let sync_db = db.clone();
let sync_done = done.clone();
let sync_lock_clone = sync_lock.clone();
let sync_task = tokio::spawn(async move {
let mut cycle = 0u32;
while !sync_done.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(100)).await;
let _guard = sync_lock_clone.lock().await;
eprintln!("sync cycle {cycle}: push");
if let Err(e) = sync_db.push().await {
eprintln!("push error (cycle {cycle}): {e}");
}
eprintln!("sync cycle {cycle}: pull");
if let Err(e) = sync_db.pull().await {
eprintln!("pull error (cycle {cycle}): {e}");
}
eprintln!("sync cycle {cycle}: checkpoint");
if let Err(e) = sync_db.checkpoint().await {
eprintln!("checkpoint error (cycle {cycle}): {e}");
}
cycle += 1;
}
cycle
});
let mut write_handles = Vec::new();
let mut connections = Vec::new();
let (conn_cnt, iterations_cnt, after_cnt) = (8u32, 100u32, 100u32);
for _ in 0..conn_cnt {
let db = db.clone();
let conn = db.connect().await.unwrap();
conn.execute("PRAGMA busy_timeout=5000", ()).await.unwrap();
connections.push(Some((db, conn)));
}
for conn_id in 0..conn_cnt {
let (_, conn) = connections[conn_id as usize].take().unwrap();
let payload = payload.clone();
write_handles.push(tokio::spawn(async move {
for row_id in 0..iterations_cnt {
let tag = format!("conn{conn_id}_row{row_id}");
let data = format!("{tag}_{payload}");
loop {
match conn
.execute(
"INSERT INTO test_data (payload) VALUES (?)",
crate::params::Params::Positional(vec![Value::Text(data.clone())]),
)
.await
{
Ok(_) => break,
Err(crate::Error::Busy(_)) => {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
Err(e) => panic!("insert failed (conn{conn_id}, row{row_id}): {e:?}"),
}
}
}
}));
}
for h in write_handles {
h.await.unwrap();
}
for i in 0..after_cnt {
let data = format!("sequential_{i}_{payload}");
conn.execute(
"INSERT INTO test_data (payload) VALUES (?)",
crate::params::Params::Positional(vec![Value::Text(data)]),
)
.await
.unwrap();
}
done.store(true, Ordering::Relaxed);
let sync_cycles = sync_task.await.unwrap();
eprintln!("completed {sync_cycles} sync cycles during writes");
let rows = conn
.query("SELECT count(*) FROM test_data", ())
.await
.unwrap();
let all = all_rows(rows).await.unwrap();
assert_eq!(
all,
vec![vec![Value::Integer(
(after_cnt + conn_cnt * iterations_cnt) as i64
)]]
);
let stats = db.stats().await.unwrap();
eprintln!(
"WAL size after all writes: {} bytes ({:.2} KB)",
stats.main_wal_size,
stats.main_wal_size as f64 / 1024.0
);
}
#[tokio::test]
pub async fn test_sync_pull_after_local_ddl_and_remote_writes() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
server
.db_sql("CREATE TABLE t(x TEXT PRIMARY KEY, y)")
.await
.unwrap();
server
.db_sql("INSERT INTO t VALUES ('a', '1'), ('b', '2'), ('c', '3')")
.await
.unwrap();
let db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db.connect().await.unwrap();
let rows = all_rows(conn.query("SELECT * FROM t", ()).await.unwrap())
.await
.unwrap();
assert_eq!(rows.len(), 3);
db.push().await.unwrap();
db.pull().await.unwrap();
conn.execute(
"INSERT INTO t VALUES ('d', '4-local'), ('e', '5-local')",
(),
)
.await
.unwrap();
conn.execute("CREATE TABLE t2(y INTEGER, z TEXT)", ())
.await
.unwrap();
conn.execute("CREATE TABLE t3(id INTEGER PRIMARY KEY, payload TEXT)", ())
.await
.unwrap();
conn.execute("INSERT INTO t2 VALUES (1, 'hello'), (2, 'world')", ())
.await
.unwrap();
conn.execute(
"INSERT INTO t3 VALUES (100, 'payload1'), (200, 'payload2')",
(),
)
.await
.unwrap();
server
.db_sql("INSERT INTO t VALUES ('e', '5-remote'), ('f', '6-remote')")
.await
.unwrap();
db.pull().await.unwrap();
let rows_t = all_rows(
conn.query("SELECT x, y FROM t ORDER BY x", ())
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
rows_t,
vec![
vec![Value::Text("a".to_string()), Value::Text("1".to_string())],
vec![Value::Text("b".to_string()), Value::Text("2".to_string())],
vec![Value::Text("c".to_string()), Value::Text("3".to_string())],
vec![
Value::Text("d".to_string()),
Value::Text("4-local".to_string())
],
vec![
Value::Text("e".to_string()),
Value::Text("5-local".to_string())
],
vec![
Value::Text("f".to_string()),
Value::Text("6-remote".to_string())
],
]
);
let rows_t2 = all_rows(
conn.query("SELECT y, z FROM t2 ORDER BY y", ())
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
rows_t2,
vec![
vec![Value::Integer(1), Value::Text("hello".to_string())],
vec![Value::Integer(2), Value::Text("world".to_string())],
]
);
let rows_t3 = all_rows(
conn.query("SELECT id, payload FROM t3 ORDER BY id", ())
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(
rows_t3,
vec![
vec![Value::Integer(100), Value::Text("payload1".to_string())],
vec![Value::Integer(200), Value::Text("payload2".to_string())],
]
);
}
#[tokio::test]
pub async fn test_sync_pull_alter_table_add_column_idempotent() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
server
.db_sql("CREATE TABLE t(x TEXT PRIMARY KEY, y TEXT)")
.await
.unwrap();
server
.db_sql("INSERT INTO t VALUES ('a', 'alpha')")
.await
.unwrap();
let db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db.connect().await.unwrap();
let rows = all_rows(conn.query("SELECT x, y FROM t", ()).await.unwrap())
.await
.unwrap();
assert_eq!(
rows,
vec![vec![
Value::Text("a".to_string()),
Value::Text("alpha".to_string())
]]
);
server
.db_sql("ALTER TABLE t ADD COLUMN z TEXT")
.await
.unwrap();
server
.db_sql("INSERT INTO t VALUES ('b', 'beta', 'from-remote')")
.await
.unwrap();
conn.execute("ALTER TABLE t ADD COLUMN z TEXT", ())
.await
.unwrap();
db.pull().await.unwrap();
let rows = all_rows(
conn.query("SELECT x, y, z FROM t ORDER BY x", ())
.await
.unwrap(),
)
.await
.unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0][0], Value::Text("a".to_string()));
assert_eq!(rows[1][0], Value::Text("b".to_string()));
assert_eq!(rows[1][2], Value::Text("from-remote".to_string()));
}
#[tokio::test]
pub async fn test_sync_push_alter_table_add_column_idempotent() {
let _ = tracing_subscriber::fmt::try_init();
let server = TursoServer::new().await.unwrap();
server
.db_sql("CREATE TABLE t(x TEXT PRIMARY KEY, y TEXT)")
.await
.unwrap();
server
.db_sql("INSERT INTO t VALUES ('a', 'alpha')")
.await
.unwrap();
let db = crate::sync::Builder::new_remote(":memory:")
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
let conn = db.connect().await.unwrap();
let rows = all_rows(conn.query("SELECT x, y FROM t", ()).await.unwrap())
.await
.unwrap();
assert_eq!(
rows,
vec![vec![
Value::Text("a".to_string()),
Value::Text("alpha".to_string())
]]
);
server
.db_sql("ALTER TABLE t ADD COLUMN z TEXT")
.await
.unwrap();
conn.execute("ALTER TABLE t ADD COLUMN z TEXT", ())
.await
.unwrap();
conn.execute("INSERT INTO t VALUES ('b', 'beta', 'from-local')", ())
.await
.unwrap();
db.push().await.unwrap();
let remote_rows = server
.db_sql("SELECT x, y, z FROM t ORDER BY x")
.await
.unwrap();
assert_eq!(remote_rows.len(), 2);
assert_eq!(remote_rows[1][0], Value::Text("b".to_string()));
assert_eq!(remote_rows[1][1], Value::Text("beta".to_string()));
assert_eq!(remote_rows[1][2], Value::Text("from-local".to_string()));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
pub async fn test_sync_pull_panics_after_full_backfill() {
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
let _ = tracing_subscriber::fmt::try_init();
let dir = TempDir::new().unwrap();
let server = TursoServer::new().await.unwrap();
let db = crate::sync::Builder::new_remote(dir.path().join("local.db").to_str().unwrap())
.with_remote_url(server.db_url())
.build()
.await
.unwrap();
server.db_sql("CREATE TABLE t(y BLOB)").await.unwrap();
db.pull().await.unwrap();
let conn = db.connect().await.unwrap();
let done = Arc::new(AtomicBool::new(false));
let applied_total = Arc::new(AtomicI64::new(0));
let mut readers = Vec::new();
for _ in 0..4 {
let reader_conn = db.connect().await.unwrap();
let done = done.clone();
let applied_total = applied_total.clone();
readers.push(tokio::spawn(async move {
let mut last_seen: i64 = 0;
while !done.load(Ordering::Relaxed) {
let sleep_ms = rand::rng().random_range(0..=4);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
let sql = match rand::rng().random_range(0..3) {
0 => "SELECT count(*) FROM t",
1 => "SELECT count(length(y)) FROM t",
_ => "SELECT count(*) FROM t WHERE length(y) > 0",
};
let rows = match reader_conn.query(sql, ()).await {
Ok(rows) => rows,
Err(crate::Error::Busy(_)) => continue,
Err(e) => panic!("reader query failed: {e:?}"),
};
let all = all_rows(rows).await.unwrap();
let Value::Integer(n) = all[0][0] else {
panic!("unexpected reader value: {:?}", all[0][0]);
};
let upper = applied_total.load(Ordering::Acquire);
assert!(
n >= last_seen && n <= upper,
"reader saw inconsistent count {n}, last_seen={last_seen}, upper={upper}"
);
last_seen = n;
}
}));
}
let mut total: i64 = 0;
for _ in 0..25 {
let cnt: u16 = rand::rng().random_range(1..=16);
let size: u32 = rand::rng().random_range(1..=4 * 1024);
server
.db_sql(&format!(
"INSERT INTO t SELECT randomblob({size}) FROM generate_series(1, {cnt})"
))
.await
.unwrap();
total += cnt as i64;
applied_total.store(total, Ordering::Release);
db.pull().await.unwrap();
let _ = db.checkpoint().await;
let rows = all_rows(conn.query("SELECT count(*) FROM t", ()).await.unwrap())
.await
.unwrap();
assert_eq!(rows, vec![vec![Value::Integer(total)]]);
}
done.store(true, Ordering::Relaxed);
for h in readers {
h.await.unwrap();
}
}
}