use apalis_libsql::LibsqlStorage;
use libsql::Builder;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tempfile::TempDir;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct CloudTask {
pub message: String,
pub value: i32,
}
fn get_turso_credentials() -> Option<(String, String)> {
let url = std::env::var("TURSO_DATABASE_URL").ok()?;
let token = std::env::var("TURSO_AUTH_TOKEN").ok()?;
Some((url, token))
}
#[tokio::test(flavor = "multi_thread")]
async fn test_turso_cloud_connection() -> Result<(), Box<dyn std::error::Error>> {
let Some((url, token)) = get_turso_credentials() else {
println!("Skipping: TURSO_DATABASE_URL and TURSO_AUTH_TOKEN not set");
return Ok(());
};
let temp_dir = TempDir::new()?;
let local_path = temp_dir.path().join("connection_test.db");
let local_path_str = local_path.to_str().unwrap();
println!("Creating embedded replica at: {}", local_path_str);
println!("Syncing with: {}", url);
let db = Builder::new_remote_replica(local_path_str, url, token)
.sync_interval(Duration::from_secs(60))
.build()
.await?;
println!("✓ Connected to Turso Cloud");
db.sync().await?;
println!("✓ Initial sync complete");
let conn = db.connect()?;
let mut rows = conn.query("SELECT 1", libsql::params![]).await?;
let _ = rows.next().await?; println!("✓ Basic query works");
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn test_turso_cloud_task_lifecycle() -> Result<(), Box<dyn std::error::Error>> {
let Some((url, token)) = get_turso_credentials() else {
println!("Skipping: TURSO_DATABASE_URL and TURSO_AUTH_TOKEN not set");
return Ok(());
};
let temp_dir = TempDir::new()?;
let local_path = temp_dir.path().join("task_lifecycle_test.db");
let local_path_str = local_path.to_str().unwrap();
println!("\n=== Turso Cloud Task Lifecycle Test ===");
println!("Local replica: {}", local_path_str);
let db = Builder::new_remote_replica(local_path_str, url, token)
.sync_interval(Duration::from_secs(5))
.build()
.await?;
db.sync().await?;
println!("✓ Connected and synced");
let db_static: &'static libsql::Database = Box::leak(Box::new(db));
let storage = LibsqlStorage::<CloudTask, ()>::new(db_static);
storage.setup().await?;
println!("✓ Schema created");
db_static.sync().await?;
println!("✓ Schema synced to cloud");
let task = CloudTask {
message: "Hello from Turso Cloud!".to_string(),
value: 42,
};
use apalis_core::backend::TaskSink;
let mut sink_storage = storage.clone();
sink_storage.push(task.clone()).await?;
println!("✓ Task pushed");
db_static.sync().await?;
println!("✓ Task synced to cloud");
let conn = db_static.connect()?;
let mut rows = conn
.query(
"SELECT COUNT(*) FROM Jobs WHERE status = 'Pending'",
libsql::params![],
)
.await?;
if let Some(row) = rows.next().await? {
let count: i64 = row.get(0)?;
println!("✓ Found {} pending task(s) in cloud", count);
assert!(count >= 1, "Should have at least 1 pending task");
}
use apalis_core::worker::context::WorkerContext;
use futures::StreamExt;
let worker = WorkerContext::new::<&'static str>("turso-test-worker");
let mut task_stream = storage.poll_default(&worker).boxed();
let _ = tokio::time::timeout(Duration::from_secs(2), task_stream.next()).await?;
let task_result = tokio::time::timeout(Duration::from_secs(5), task_stream.next()).await;
match task_result {
Ok(Some(Ok(Some(polled_task)))) => {
let decoded: CloudTask = serde_json::from_slice(&polled_task.args)?;
println!("✓ Polled task: {:?}", decoded);
assert_eq!(decoded.message, "Hello from Turso Cloud!");
assert_eq!(decoded.value, 42);
}
Ok(Some(Ok(None))) => {
println!("⚠ Got None (registration), trying again...");
}
Ok(Some(Err(e))) => {
println!("✗ Poll error: {}", e);
return Err(e.into());
}
Ok(None) => {
println!("⚠ Stream ended");
}
Err(_) => {
println!("⚠ Timeout waiting for task (this may be OK if task was already processed)");
}
}
db_static.sync().await?;
println!("✓ Final sync complete");
println!("\n=== Turso Cloud Test PASSED ===\n");
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn test_turso_sync_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
let Some((url, token)) = get_turso_credentials() else {
println!("Skipping: TURSO_DATABASE_URL and TURSO_AUTH_TOKEN not set");
return Ok(());
};
let temp_dir = TempDir::new()?;
let local_path = temp_dir.path().join("sync_test.db");
let local_path_str = local_path.to_str().unwrap();
println!("\n=== Sync Roundtrip Test ===");
println!("Local replica: {}", local_path_str);
let db = Builder::new_remote_replica(local_path_str, url, token)
.sync_interval(Duration::from_secs(60))
.build()
.await?;
db.sync().await?;
println!("✓ Initial sync");
let conn = db.connect()?;
conn.execute(
"CREATE TABLE IF NOT EXISTS sync_test (id INTEGER PRIMARY KEY, data TEXT)",
libsql::params![],
)
.await?;
let test_id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis();
conn.execute(
"INSERT INTO sync_test (id, data) VALUES (?1, ?2)",
libsql::params![test_id as i64, "test data"],
)
.await?;
println!("✓ Inserted test row with id: {}", test_id);
db.sync().await?;
println!("✓ Synced to cloud");
let mut rows = conn
.query(
"SELECT data FROM sync_test WHERE id = ?1",
libsql::params![test_id as i64],
)
.await?;
if let Some(row) = rows.next().await? {
let data: String = row.get(0)?;
assert_eq!(data, "test data");
println!("✓ Read back: {}", data);
}
conn.execute(
"DELETE FROM sync_test WHERE id = ?1",
libsql::params![test_id as i64],
)
.await?;
db.sync().await?;
println!("✓ Cleaned up test data");
println!("\n=== Sync Roundtrip PASSED ===\n");
Ok(())
}