use chrono::{DateTime, TimeZone, Utc};
use plane_core::{
messages::agent::{BackendState, SpawnRequest},
types::BackendId,
};
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{migrate, Result, SqlitePool};
use std::{path::Path, str::FromStr};
#[allow(unused)]
#[derive(Clone, Debug)]
pub struct DroneDatabase {
pool: SqlitePool,
}
pub struct Backend {
pub backend_id: BackendId,
pub state: BackendState,
pub spec: SpawnRequest,
}
#[allow(unused)]
impl DroneDatabase {
pub async fn new(db_path: &Path) -> Result<DroneDatabase> {
let co = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePoolOptions::new().connect_with(co).await?;
migrate!("./migrations").run(&pool).await?;
let connection = DroneDatabase { pool };
Ok(connection)
}
pub async fn insert_backend(&self, spec: &SpawnRequest) -> Result<()> {
let backend_id = spec.backend_id.id().to_string();
let spec =
serde_json::to_string(&spec).expect("SpawnRequest serialization should never fail.");
sqlx::query!(
r"
insert into backend
(name, spec, state)
values
(?, ?, 'Loading')
",
backend_id,
spec,
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn get_backends(&self) -> anyhow::Result<Vec<Backend>> {
sqlx::query!(
r"
select name, spec, state
from backend
"
)
.fetch_all(&self.pool)
.await?
.iter()
.map(|d| {
Ok(Backend {
backend_id: BackendId::new(d.name.clone()),
spec: serde_json::from_str(&d.spec)?,
state: BackendState::from_str(&d.state)?,
})
})
.collect()
}
pub async fn update_backend_state(
&self,
backend: &BackendId,
state: BackendState,
) -> Result<()> {
let state = state.to_string();
sqlx::query!(
r"
update backend
set state = ?
",
state
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn get_proxy_route(&self, subdomain: &str) -> Result<Option<String>> {
Ok(sqlx::query!(
r"
select address
from route
where subdomain = ?
",
subdomain
)
.fetch_optional(&self.pool)
.await?
.map(|d| d.address))
}
pub async fn insert_proxy_route(
&self,
backend: &BackendId,
subdomain: &str,
address: &str,
) -> Result<()> {
let backend_id = backend.id().to_string();
sqlx::query!(
r"
insert into route
(backend, subdomain, address, last_active)
values
(?, ?, ?, unixepoch())
",
backend_id,
subdomain,
address
)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn reset_last_active_times(&self, subdomains: &[String]) -> Result<()> {
for subdomain in subdomains {
sqlx::query!(
r"
update route
set last_active = unixepoch()
where subdomain = ?
",
subdomain
)
.execute(&self.pool)
.await?;
}
Ok(())
}
pub async fn get_backend_last_active(&self, backend: &BackendId) -> Result<DateTime<Utc>> {
let backend_id = backend.id();
let time = sqlx::query!(
r#"
select last_active
from route
where backend = ?
"#,
backend_id
)
.fetch_one(&self.pool)
.await?
.last_active;
Ok(Utc.timestamp(time, 0))
}
}