plane 0.5.5

Session backend orchestrator for ambitious browser-based apps.
Documentation
use super::{connect::ConnectError, subscribe::emit_with_key};
use plane_common::{
    names::{BackendActionName, BackendName, Name},
    protocol::{BackendAction, BackendActionMessage},
    types::NodeId,
};
use sqlx::{PgPool, Postgres};

pub struct BackendActionDatabase {
    pool: PgPool,
}

impl BackendActionDatabase {
    pub fn new(pool: &PgPool) -> Self {
        // TODO: can't use a reference because the futures need to be Send.
        Self { pool: pool.clone() }
    }

    pub async fn ack_pending_action(
        &self,
        notification_id: &BackendActionName,
        drone_id: NodeId,
    ) -> anyhow::Result<()> {
        sqlx::query!(
            r#"
            update "backend_action"
            set acked_at = now()
            where "id" = $1
            and "drone_id" = $2
            "#,
            notification_id.to_string(),
            drone_id.as_i32(),
        )
        .execute(&self.pool)
        .await?;

        Ok(())
    }

    pub async fn pending_actions(
        &self,
        drone: NodeId,
        limit: i64,
    ) -> anyhow::Result<Vec<BackendActionMessage>> {
        let rows = sqlx::query!(
            r#"
            select "action"
            from "backend_action"
            where "drone_id" = $1
            and acked_at is null
            order by created_at asc
            limit $2
            "#,
            drone.as_i32(),
            limit,
        )
        .fetch_all(&self.pool)
        .await?;

        let mut actions = Vec::new();

        for row in rows {
            let action: BackendActionMessage = serde_json::from_value(row.action)?;
            actions.push(action);
        }

        Ok(actions)
    }

    pub async fn create_pending_action(
        &self,
        backend_id: &BackendName,
        drone_id: NodeId,
        action: &BackendAction,
    ) -> Result<(), ConnectError> {
        let mut txn = self.pool.begin().await?;
        create_pending_action(&mut txn, backend_id, drone_id, action).await?;
        txn.commit().await?;
        Ok(())
    }
}

pub async fn create_pending_action(
    txn: &mut sqlx::Transaction<'_, Postgres>,
    backend_id: &BackendName,
    drone_id: NodeId,
    action: &BackendAction,
) -> Result<(), ConnectError> {
    let action_id = BackendActionName::new_random();

    let backend_action = BackendActionMessage {
        action_id: action_id.clone(),
        backend_id: backend_id.clone(),
        drone_id,
        action: action.clone(),
    };

    emit_with_key(&mut *txn, &drone_id.to_string(), &backend_action).await?;

    sqlx::query!(
        r#"
        insert into "backend_action" ("id", "action", "drone_id", "backend_id")
        values ($1, $2, $3, $4)
        "#,
        action_id.to_string(),
        serde_json::to_value(&backend_action).expect("Backend action is always serializable"),
        drone_id.as_i32(),
        backend_id.to_string(),
    )
    .execute(&mut **txn)
    .await?;

    Ok(())
}