#[cfg(feature = "database")]
use super::messages::{DatabasePoolConnected, DatabasePoolConnectionFailed};
#[cfg(feature = "database")]
use acton_reactive::prelude::*;
#[cfg(feature = "database")]
use std::sync::Arc;
#[cfg(feature = "database")]
use tokio::sync::RwLock;
#[cfg(feature = "database")]
use tokio_util::sync::CancellationToken;
#[cfg(feature = "database")]
pub type SharedDbPool = Arc<RwLock<Option<sqlx::PgPool>>>;
#[cfg(feature = "database")]
#[derive(Debug, Default)]
pub struct DatabasePoolState {
pub pool: Option<sqlx::PgPool>,
pub config: Option<crate::config::DatabaseConfig>,
pub connecting: bool,
pub shared_pool: Option<SharedDbPool>,
pub cancel_token: Option<CancellationToken>,
}
#[cfg(feature = "database")]
pub struct DatabasePoolAgent;
#[cfg(feature = "database")]
impl DatabasePoolAgent {
pub async fn spawn(
runtime: &mut ActorRuntime,
config: crate::config::DatabaseConfig,
shared_pool: Option<SharedDbPool>,
) -> anyhow::Result<ActorHandle> {
let mut agent = runtime.new_actor::<DatabasePoolState>();
let cancel_token = CancellationToken::new();
agent.model.config = Some(config);
agent.model.connecting = true;
agent.model.shared_pool = shared_pool;
agent.model.cancel_token = Some(cancel_token);
agent.mutate_on::<DatabasePoolConnected>(|agent, envelope| {
let pool = envelope.message().pool.clone();
agent.model.pool = Some(pool.clone());
agent.model.connecting = false;
let shared_pool = agent.model.shared_pool.clone();
Reply::pending(async move {
if let Some(shared) = shared_pool {
*shared.write().await = Some(pool);
tracing::info!("Database pool connected and stored in shared state");
} else {
tracing::info!("Database pool connected (no shared state)");
}
})
});
agent.mutate_on::<DatabasePoolConnectionFailed>(|agent, envelope| {
let error_msg = envelope.message().error.clone();
agent.model.connecting = false;
tracing::error!("Database pool connection failed: {}", error_msg);
Reply::ready()
});
agent.after_start(|agent| {
let config = agent.model.config.clone();
let cancel_token = agent.model.cancel_token.clone();
let self_handle = agent.handle().clone();
if let Some(cfg) = config {
tracing::info!("Database pool agent starting, connecting to database...");
tokio::spawn(async move {
tokio::select! {
biased;
() = async {
if let Some(ref token) = cancel_token {
token.cancelled().await;
} else {
std::future::pending::<()>().await;
}
} => {
tracing::info!("Database connection cancelled during shutdown");
self_handle
.send(DatabasePoolConnectionFailed {
error: "Connection cancelled during shutdown".to_string(),
})
.await;
}
result = crate::database::create_pool(&cfg) => {
match result {
Ok(pool) => {
self_handle.send(DatabasePoolConnected { pool }).await;
}
Err(e) => {
self_handle
.send(DatabasePoolConnectionFailed {
error: e.to_string(),
})
.await;
}
}
}
}
});
}
Reply::ready()
});
agent.before_stop(|agent| {
let pool = agent.model.pool.clone();
let cancel_token = agent.model.cancel_token.clone();
Reply::pending(async move {
if let Some(token) = cancel_token {
token.cancel();
tracing::debug!("Database connection retry cancelled");
}
if let Some(p) = pool {
tracing::info!("Database pool agent stopping, closing connections...");
let _ = tokio::spawn(async move { p.close().await }).await;
tracing::info!("Database pool closed");
}
})
});
let handle = agent.start().await;
Ok(handle)
}
}
#[cfg(feature = "cache")]
use super::messages::{RedisPoolConnected, RedisPoolConnectionFailed};
#[cfg(all(feature = "cache", not(feature = "database")))]
use acton_reactive::prelude::*;
#[cfg(all(feature = "cache", not(feature = "database")))]
use std::sync::Arc;
#[cfg(all(feature = "cache", not(feature = "database")))]
use tokio::sync::RwLock;
#[cfg(feature = "cache")]
pub type SharedRedisPool = Arc<RwLock<Option<deadpool_redis::Pool>>>;
#[cfg(feature = "cache")]
#[derive(Debug, Default)]
pub struct RedisPoolState {
pub pool: Option<deadpool_redis::Pool>,
pub config: Option<crate::config::RedisConfig>,
pub connecting: bool,
pub shared_pool: Option<SharedRedisPool>,
}
#[cfg(feature = "cache")]
pub struct RedisPoolAgent;
#[cfg(feature = "cache")]
impl RedisPoolAgent {
pub async fn spawn(
runtime: &mut ActorRuntime,
config: crate::config::RedisConfig,
shared_pool: Option<SharedRedisPool>,
) -> anyhow::Result<ActorHandle> {
let mut agent = runtime.new_actor::<RedisPoolState>();
agent.model.config = Some(config);
agent.model.connecting = true;
agent.model.shared_pool = shared_pool;
agent.mutate_on::<RedisPoolConnected>(|agent, envelope| {
let pool = envelope.message().pool.clone();
agent.model.pool = Some(pool.clone());
agent.model.connecting = false;
let shared_pool = agent.model.shared_pool.clone();
Reply::pending(async move {
if let Some(shared) = shared_pool {
*shared.write().await = Some(pool);
tracing::info!("Redis pool connected and stored in shared state");
} else {
tracing::info!("Redis pool connected (no shared state)");
}
})
});
agent.mutate_on::<RedisPoolConnectionFailed>(|agent, envelope| {
let error_msg = envelope.message().error.clone();
agent.model.connecting = false;
tracing::error!("Redis pool connection failed: {}", error_msg);
Reply::ready()
});
agent.after_start(|agent| {
let config = agent.model.config.clone();
let self_handle = agent.handle().clone();
Reply::pending(async move {
if let Some(cfg) = config {
tracing::info!("Redis pool agent starting, connecting to Redis...");
let result =
tokio::spawn(async move { crate::cache::create_pool(&cfg).await }).await;
match result {
Ok(Ok(pool)) => {
self_handle.send(RedisPoolConnected { pool }).await;
}
Ok(Err(e)) => {
self_handle
.send(RedisPoolConnectionFailed {
error: e.to_string(),
})
.await;
}
Err(e) => {
self_handle
.send(RedisPoolConnectionFailed {
error: format!("Connection task panicked: {}", e),
})
.await;
}
}
}
})
});
agent.before_stop(|_agent| {
Reply::pending(async move {
tracing::info!("Redis pool agent stopping");
})
});
let handle = agent.start().await;
Ok(handle)
}
}
#[cfg(feature = "events")]
use super::messages::{NatsClientConnected, NatsClientConnectionFailed};
#[cfg(all(feature = "events", not(feature = "database"), not(feature = "cache")))]
use acton_reactive::prelude::*;
#[cfg(all(feature = "events", not(feature = "database"), not(feature = "cache")))]
use std::sync::Arc;
#[cfg(all(feature = "events", not(feature = "database"), not(feature = "cache")))]
use tokio::sync::RwLock;
#[cfg(feature = "events")]
pub type SharedNatsClient = Arc<RwLock<Option<async_nats::Client>>>;
#[cfg(feature = "events")]
#[derive(Debug, Default)]
pub struct NatsPoolState {
pub client: Option<async_nats::Client>,
pub config: Option<crate::config::NatsConfig>,
pub connecting: bool,
pub shared_client: Option<SharedNatsClient>,
}
#[cfg(feature = "events")]
pub struct NatsPoolAgent;
#[cfg(feature = "events")]
impl NatsPoolAgent {
pub async fn spawn(
runtime: &mut ActorRuntime,
config: crate::config::NatsConfig,
shared_client: Option<SharedNatsClient>,
) -> anyhow::Result<ActorHandle> {
let mut agent = runtime.new_actor::<NatsPoolState>();
agent.model.config = Some(config);
agent.model.connecting = true;
agent.model.shared_client = shared_client;
agent.mutate_on::<NatsClientConnected>(|agent, envelope| {
let client = envelope.message().client.clone();
agent.model.client = Some(client.clone());
agent.model.connecting = false;
let shared_client = agent.model.shared_client.clone();
Reply::pending(async move {
if let Some(shared) = shared_client {
*shared.write().await = Some(client);
tracing::info!("NATS client connected and stored in shared state");
} else {
tracing::info!("NATS client connected (no shared state)");
}
})
});
agent.mutate_on::<NatsClientConnectionFailed>(|agent, envelope| {
let error_msg = envelope.message().error.clone();
agent.model.connecting = false;
tracing::error!("NATS client connection failed: {}", error_msg);
Reply::ready()
});
agent.after_start(|agent| {
let config = agent.model.config.clone();
let self_handle = agent.handle().clone();
Reply::pending(async move {
if let Some(cfg) = config {
tracing::info!("NATS pool agent starting, connecting to NATS...");
let result =
tokio::spawn(async move { crate::events::create_client(&cfg).await }).await;
match result {
Ok(Ok(client)) => {
self_handle.send(NatsClientConnected { client }).await;
}
Ok(Err(e)) => {
self_handle
.send(NatsClientConnectionFailed {
error: e.to_string(),
})
.await;
}
Err(e) => {
self_handle
.send(NatsClientConnectionFailed {
error: format!("Connection task panicked: {}", e),
})
.await;
}
}
}
})
});
agent.before_stop(|agent| {
let client = agent.model.client.clone();
Reply::pending(async move {
if let Some(c) = client {
tracing::info!("NATS pool agent stopping, closing connection...");
drop(c);
tracing::info!("NATS client closed");
}
})
});
let handle = agent.start().await;
Ok(handle)
}
}
#[cfg(feature = "turso")]
use super::messages::{TursoDbConnected, TursoDbConnectionFailed};
#[cfg(all(
feature = "turso",
not(feature = "database"),
not(feature = "events"),
not(feature = "cache")
))]
use acton_reactive::prelude::*;
#[cfg(all(
feature = "turso",
not(feature = "database"),
not(feature = "cache"),
not(feature = "events")
))]
use std::sync::Arc;
#[cfg(all(
feature = "turso",
not(feature = "database"),
not(feature = "cache"),
not(feature = "events")
))]
use tokio::sync::RwLock;
#[cfg(all(feature = "turso", not(feature = "database")))]
use tokio_util::sync::CancellationToken;
#[cfg(feature = "turso")]
pub type SharedTursoDb = Arc<RwLock<Option<Arc<libsql::Database>>>>;
#[cfg(feature = "turso")]
#[derive(Debug, Default)]
pub struct TursoDbState {
pub db: Option<Arc<libsql::Database>>,
pub config: Option<crate::config::TursoConfig>,
pub connecting: bool,
pub shared_db: Option<SharedTursoDb>,
pub cancel_token: Option<CancellationToken>,
}
#[cfg(feature = "turso")]
pub struct TursoDbAgent;
#[cfg(feature = "turso")]
impl TursoDbAgent {
pub async fn spawn(
runtime: &mut ActorRuntime,
config: crate::config::TursoConfig,
shared_db: Option<SharedTursoDb>,
) -> anyhow::Result<ActorHandle> {
let mut agent = runtime.new_actor::<TursoDbState>();
let cancel_token = CancellationToken::new();
agent.model.config = Some(config);
agent.model.connecting = true;
agent.model.shared_db = shared_db;
agent.model.cancel_token = Some(cancel_token);
agent.mutate_on::<TursoDbConnected>(|agent, envelope| {
let db = envelope.message().db.clone();
agent.model.db = Some(db.clone());
agent.model.connecting = false;
let shared_db = agent.model.shared_db.clone();
Reply::pending(async move {
if let Some(shared) = shared_db {
*shared.write().await = Some(db.clone());
tracing::info!("Turso database connected and stored in shared state");
} else {
tracing::info!("Turso database connected (no shared state)");
}
})
});
agent.mutate_on::<TursoDbConnectionFailed>(|agent, envelope| {
let error_msg = envelope.message().error.clone();
agent.model.connecting = false;
tracing::error!("Turso database connection failed: {}", error_msg);
Reply::ready()
});
agent.after_start(|agent| {
let config = agent.model.config.clone();
let cancel_token = agent.model.cancel_token.clone();
let self_handle = agent.handle().clone();
if let Some(cfg) = config {
tracing::info!(
"Turso database agent starting, connecting to database (mode={:?})...",
cfg.mode
);
tokio::spawn(async move {
tokio::select! {
biased;
() = async {
if let Some(ref token) = cancel_token {
token.cancelled().await;
} else {
std::future::pending::<()>().await;
}
} => {
tracing::info!("Turso connection cancelled during shutdown");
self_handle
.send(TursoDbConnectionFailed {
error: "Connection cancelled during shutdown".to_string(),
})
.await;
}
result = crate::turso::create_database(&cfg) => {
match result {
Ok(db) => {
self_handle.send(TursoDbConnected { db: Arc::new(db) }).await;
}
Err(e) => {
self_handle
.send(TursoDbConnectionFailed {
error: e.to_string(),
})
.await;
}
}
}
}
});
}
Reply::ready()
});
agent.before_stop(|agent| {
let cancel_token = agent.model.cancel_token.clone();
Reply::pending(async move {
if let Some(token) = cancel_token {
token.cancel();
tracing::debug!("Turso connection retry cancelled");
}
tracing::info!("Turso database agent stopping");
})
});
let handle = agent.start().await;
Ok(handle)
}
}
#[cfg(feature = "surrealdb")]
use super::messages::{SurrealDbConnected, SurrealDbConnectionFailed};
#[cfg(all(
feature = "surrealdb",
not(feature = "database"),
not(feature = "events"),
not(feature = "cache"),
not(feature = "turso")
))]
use acton_reactive::prelude::*;
#[cfg(all(
feature = "surrealdb",
not(feature = "database"),
not(feature = "cache"),
not(feature = "events"),
not(feature = "turso")
))]
use std::sync::Arc;
#[cfg(all(
feature = "surrealdb",
not(feature = "database"),
not(feature = "cache"),
not(feature = "events"),
not(feature = "turso")
))]
use tokio::sync::RwLock;
#[cfg(all(
feature = "surrealdb",
not(feature = "database"),
not(feature = "turso")
))]
use tokio_util::sync::CancellationToken;
#[cfg(feature = "surrealdb")]
pub type SharedSurrealDb = Arc<RwLock<Option<Arc<crate::surrealdb_backend::SurrealClient>>>>;
#[cfg(feature = "surrealdb")]
#[derive(Debug, Default)]
pub struct SurrealDbState {
pub client: Option<Arc<crate::surrealdb_backend::SurrealClient>>,
pub config: Option<crate::config::SurrealDbConfig>,
pub connecting: bool,
pub shared_client: Option<SharedSurrealDb>,
pub cancel_token: Option<CancellationToken>,
}
#[cfg(feature = "surrealdb")]
pub struct SurrealDbAgent;
#[cfg(feature = "surrealdb")]
impl SurrealDbAgent {
pub async fn spawn(
runtime: &mut ActorRuntime,
config: crate::config::SurrealDbConfig,
shared_client: Option<SharedSurrealDb>,
) -> anyhow::Result<ActorHandle> {
let mut agent = runtime.new_actor::<SurrealDbState>();
let cancel_token = CancellationToken::new();
agent.model.config = Some(config);
agent.model.connecting = true;
agent.model.shared_client = shared_client;
agent.model.cancel_token = Some(cancel_token);
agent.mutate_on::<SurrealDbConnected>(|agent, envelope| {
let client = envelope.message().client.clone();
agent.model.client = Some(client.clone());
agent.model.connecting = false;
let shared_client = agent.model.shared_client.clone();
Reply::pending(async move {
if let Some(shared) = shared_client {
*shared.write().await = Some(client);
tracing::info!("SurrealDB client connected and stored in shared state");
} else {
tracing::info!("SurrealDB client connected (no shared state)");
}
})
});
agent.mutate_on::<SurrealDbConnectionFailed>(|agent, envelope| {
let error_msg = envelope.message().error.clone();
agent.model.connecting = false;
tracing::error!("SurrealDB client connection failed: {}", error_msg);
Reply::ready()
});
agent.after_start(|agent| {
let config = agent.model.config.clone();
let cancel_token = agent.model.cancel_token.clone();
let self_handle = agent.handle().clone();
if let Some(cfg) = config {
tracing::info!("SurrealDB agent starting, connecting to database...");
tokio::spawn(async move {
tokio::select! {
biased;
() = async {
if let Some(ref token) = cancel_token {
token.cancelled().await;
} else {
std::future::pending::<()>().await;
}
} => {
tracing::info!("SurrealDB connection cancelled during shutdown");
self_handle
.send(SurrealDbConnectionFailed {
error: "Connection cancelled during shutdown".to_string(),
})
.await;
}
result = crate::surrealdb_backend::create_client(&cfg) => {
match result {
Ok(client) => {
self_handle.send(SurrealDbConnected { client: Arc::new(client) }).await;
}
Err(e) => {
self_handle
.send(SurrealDbConnectionFailed {
error: e.to_string(),
})
.await;
}
}
}
}
});
}
Reply::ready()
});
agent.before_stop(|agent| {
let cancel_token = agent.model.cancel_token.clone();
Reply::pending(async move {
if let Some(token) = cancel_token {
token.cancel();
tracing::debug!("SurrealDB connection retry cancelled");
}
tracing::info!("SurrealDB agent stopping");
})
});
let handle = agent.start().await;
Ok(handle)
}
}
#[cfg(feature = "clickhouse")]
use super::messages::{ClickHouseClientConnected, ClickHouseClientConnectionFailed};
#[cfg(all(
feature = "clickhouse",
not(feature = "database"),
not(feature = "events"),
not(feature = "cache"),
not(feature = "turso"),
not(feature = "surrealdb")
))]
use acton_reactive::prelude::*;
#[cfg(all(
feature = "clickhouse",
not(feature = "database"),
not(feature = "cache"),
not(feature = "events"),
not(feature = "turso"),
not(feature = "surrealdb")
))]
use std::sync::Arc;
#[cfg(all(
feature = "clickhouse",
not(feature = "database"),
not(feature = "cache"),
not(feature = "events"),
not(feature = "turso"),
not(feature = "surrealdb")
))]
use tokio::sync::RwLock;
#[cfg(feature = "clickhouse")]
pub type SharedClickHouseClient = Arc<RwLock<Option<clickhouse::Client>>>;
#[cfg(feature = "clickhouse")]
#[derive(Default)]
pub struct ClickHouseClientState {
pub client: Option<clickhouse::Client>,
pub config: Option<crate::config::ClickHouseConfig>,
pub connecting: bool,
pub shared_client: Option<SharedClickHouseClient>,
}
#[cfg(feature = "clickhouse")]
impl std::fmt::Debug for ClickHouseClientState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClickHouseClientState")
.field("config", &self.config)
.field("connecting", &self.connecting)
.field("has_client", &self.client.is_some())
.finish()
}
}
#[cfg(feature = "clickhouse")]
pub struct ClickHousePoolAgent;
#[cfg(feature = "clickhouse")]
impl ClickHousePoolAgent {
pub async fn spawn(
runtime: &mut ActorRuntime,
config: crate::config::ClickHouseConfig,
shared_client: Option<SharedClickHouseClient>,
) -> anyhow::Result<ActorHandle> {
let mut agent = runtime.new_actor::<ClickHouseClientState>();
agent.model.config = Some(config);
agent.model.connecting = true;
agent.model.shared_client = shared_client;
agent.mutate_on::<ClickHouseClientConnected>(|agent, envelope| {
let client = envelope.message().client.clone();
agent.model.client = Some(client.clone());
agent.model.connecting = false;
let shared_client = agent.model.shared_client.clone();
Reply::pending(async move {
if let Some(shared) = shared_client {
*shared.write().await = Some(client);
tracing::info!("ClickHouse client connected and stored in shared state");
} else {
tracing::info!("ClickHouse client connected (no shared state)");
}
})
});
agent.mutate_on::<ClickHouseClientConnectionFailed>(|agent, envelope| {
let error_msg = envelope.message().error.clone();
agent.model.connecting = false;
tracing::error!("ClickHouse client connection failed: {}", error_msg);
Reply::ready()
});
agent.after_start(|agent| {
let config = agent.model.config.clone();
let self_handle = agent.handle().clone();
Reply::pending(async move {
if let Some(cfg) = config {
tracing::info!("ClickHouse pool agent starting, connecting to ClickHouse...");
let result = tokio::spawn(async move {
crate::clickhouse_backend::create_client(&cfg).await
})
.await;
match result {
Ok(Ok(client)) => {
self_handle
.send(ClickHouseClientConnected { client })
.await;
}
Ok(Err(e)) => {
self_handle
.send(ClickHouseClientConnectionFailed {
error: e.to_string(),
})
.await;
}
Err(e) => {
self_handle
.send(ClickHouseClientConnectionFailed {
error: format!("Connection task panicked: {}", e),
})
.await;
}
}
}
})
});
agent.before_stop(|agent| {
let client = agent.model.client.clone();
Reply::pending(async move {
if client.is_some() {
tracing::info!("ClickHouse pool agent stopping, closing connection...");
drop(client);
tracing::info!("ClickHouse client closed");
}
})
});
let handle = agent.start().await;
Ok(handle)
}
}
#[cfg(test)]
mod tests {
#[allow(unused_imports)]
use super::*;
#[cfg(feature = "turso")]
mod turso_agent_tests {
use super::*;
use crate::config::{TursoConfig, TursoMode};
use std::path::PathBuf;
fn temp_db_path(name: &str) -> PathBuf {
let mut path = std::env::temp_dir();
path.push(format!(
"turso_agent_test_{}_{}.db",
name,
std::process::id()
));
path
}
fn cleanup_db(path: &PathBuf) {
let _ = std::fs::remove_file(path);
let _ = std::fs::remove_file(path.with_extension("db-wal"));
let _ = std::fs::remove_file(path.with_extension("db-shm"));
}
#[tokio::test]
async fn test_turso_agent_spawn_and_connect() {
let db_path = temp_db_path("agent_spawn");
let config = TursoConfig {
mode: TursoMode::Local,
path: Some(db_path.clone()),
url: None,
auth_token: None,
sync_interval_secs: None,
encryption_key: None,
read_your_writes: true,
max_retries: 0,
retry_delay_secs: 1,
optional: false,
lazy_init: false,
};
let shared_db: SharedTursoDb = Arc::new(RwLock::new(None));
let mut runtime = acton_reactive::prelude::ActonApp::launch_async().await;
let handle = TursoDbAgent::spawn(&mut runtime, config, Some(shared_db.clone()))
.await
.expect("Failed to spawn TursoDbAgent");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let db_guard = shared_db.read().await;
assert!(
db_guard.is_some(),
"Database should be available in shared storage"
);
if let Some(ref db) = *db_guard {
let conn = db.connect().expect("Failed to connect");
conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY)", ())
.await
.expect("Failed to create table");
}
drop(db_guard);
let _ = handle.stop().await;
runtime
.shutdown_all()
.await
.expect("Failed to shutdown runtime");
cleanup_db(&db_path);
}
#[tokio::test]
async fn test_turso_agent_graceful_shutdown() {
let db_path = temp_db_path("agent_shutdown");
let config = TursoConfig {
mode: TursoMode::Local,
path: Some(db_path.clone()),
url: None,
auth_token: None,
sync_interval_secs: None,
encryption_key: None,
read_your_writes: true,
max_retries: 0,
retry_delay_secs: 1,
optional: false,
lazy_init: false,
};
let shared_db: SharedTursoDb = Arc::new(RwLock::new(None));
let mut runtime = acton_reactive::prelude::ActonApp::launch_async().await;
let handle = TursoDbAgent::spawn(&mut runtime, config, Some(shared_db.clone()))
.await
.expect("Failed to spawn TursoDbAgent");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
assert!(shared_db.read().await.is_some());
{
let db_guard = shared_db.read().await;
if let Some(ref db) = *db_guard {
let conn = db.connect().expect("Failed to connect");
conn.execute("CREATE TABLE IF NOT EXISTS _check (id INTEGER)", ())
.await
.expect("Failed to create table");
}
}
let _ = handle.stop().await;
runtime
.shutdown_all()
.await
.expect("Failed to shutdown runtime");
assert!(
db_path.exists(),
"Database file should still exist after graceful shutdown"
);
cleanup_db(&db_path);
}
#[tokio::test]
async fn test_turso_agent_without_shared_storage() {
let db_path = temp_db_path("agent_no_shared");
let config = TursoConfig {
mode: TursoMode::Local,
path: Some(db_path.clone()),
url: None,
auth_token: None,
sync_interval_secs: None,
encryption_key: None,
read_your_writes: true,
max_retries: 0,
retry_delay_secs: 1,
optional: false,
lazy_init: false,
};
let mut runtime = acton_reactive::prelude::ActonApp::launch_async().await;
let handle = TursoDbAgent::spawn(&mut runtime, config, None)
.await
.expect("Failed to spawn TursoDbAgent");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let _ = handle.stop().await;
runtime
.shutdown_all()
.await
.expect("Failed to shutdown runtime");
cleanup_db(&db_path);
}
#[tokio::test]
async fn test_turso_agent_missing_config_handling() {
let config = TursoConfig {
mode: TursoMode::Remote,
path: None,
url: None, auth_token: Some("some-token".to_string()),
sync_interval_secs: None,
encryption_key: None,
read_your_writes: true,
max_retries: 0, retry_delay_secs: 1,
optional: true, lazy_init: false,
};
let shared_db: SharedTursoDb = Arc::new(RwLock::new(None));
let mut runtime = acton_reactive::prelude::ActonApp::launch_async().await;
let handle = TursoDbAgent::spawn(&mut runtime, config, Some(shared_db.clone()))
.await
.expect("Failed to spawn TursoDbAgent");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
assert!(
shared_db.read().await.is_none(),
"Database should not be available when URL is missing"
);
let _ = handle.stop().await;
runtime
.shutdown_all()
.await
.expect("Failed to shutdown runtime");
}
#[tokio::test]
async fn test_turso_agent_multiple_agents() {
let db_path1 = temp_db_path("multi_agent_1");
let db_path2 = temp_db_path("multi_agent_2");
let config1 = TursoConfig {
mode: TursoMode::Local,
path: Some(db_path1.clone()),
url: None,
auth_token: None,
sync_interval_secs: None,
encryption_key: None,
read_your_writes: true,
max_retries: 0,
retry_delay_secs: 1,
optional: false,
lazy_init: false,
};
let config2 = TursoConfig {
mode: TursoMode::Local,
path: Some(db_path2.clone()),
url: None,
auth_token: None,
sync_interval_secs: None,
encryption_key: None,
read_your_writes: true,
max_retries: 0,
retry_delay_secs: 1,
optional: false,
lazy_init: false,
};
let shared_db1: SharedTursoDb = Arc::new(RwLock::new(None));
let shared_db2: SharedTursoDb = Arc::new(RwLock::new(None));
let mut runtime = acton_reactive::prelude::ActonApp::launch_async().await;
let handle1 = TursoDbAgent::spawn(&mut runtime, config1, Some(shared_db1.clone()))
.await
.expect("Failed to spawn first agent");
let handle2 = TursoDbAgent::spawn(&mut runtime, config2, Some(shared_db2.clone()))
.await
.expect("Failed to spawn second agent");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
assert!(
shared_db1.read().await.is_some(),
"First database should be available"
);
assert!(
shared_db2.read().await.is_some(),
"Second database should be available"
);
{
let db1 = shared_db1.read().await;
let conn1 = db1.as_ref().unwrap().connect().unwrap();
conn1
.execute("CREATE TABLE db1_table (id INTEGER)", ())
.await
.unwrap();
}
{
let db2 = shared_db2.read().await;
let conn2 = db2.as_ref().unwrap().connect().unwrap();
conn2
.execute("CREATE TABLE db2_table (id INTEGER)", ())
.await
.unwrap();
}
let _ = handle1.stop().await;
let _ = handle2.stop().await;
runtime
.shutdown_all()
.await
.expect("Failed to shutdown runtime");
cleanup_db(&db_path1);
cleanup_db(&db_path2);
}
}
}