lash-postgres-store 0.1.0-alpha.62

PostgreSQL-backed durable storage for the lash agent runtime.
Documentation
fn process_status_label(record: &ProcessRecord) -> &'static str {
    record.status.label()
}

async fn load_process_tx(
    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
    process_id: &str,
) -> Result<Option<ProcessRecord>, PluginError> {
    let json: Option<String> = sqlx::query_scalar(
        "SELECT record_json
             FROM lash_processes
             WHERE process_id = $1
             FOR UPDATE",
    )
    .bind(process_id)
    .fetch_optional(&mut **tx)
    .await
    .map_err(plugin_sqlx_error)?;
    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
        .transpose()
}

async fn load_process(
    pool: &PgPool,
    process_id: &str,
) -> Result<Option<ProcessRecord>, PluginError> {
    let json: Option<String> =
        sqlx::query_scalar("SELECT record_json FROM lash_processes WHERE process_id = $1")
            .bind(process_id)
            .fetch_optional(pool)
            .await
            .map_err(plugin_sqlx_error)?;
    json.map(|json| serde_json::from_str(&json).map_err(process_decode_error))
        .transpose()
}

async fn save_process_tx(
    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
    record: &ProcessRecord,
) -> Result<(), PluginError> {
    sqlx::query(
        "UPDATE lash_processes
         SET updated_at_ms = $2, status = $3, record_json = $4
         WHERE process_id = $1",
    )
    .bind(&record.id)
    .bind(record.updated_at_ms as i64)
    .bind(process_status_label(record))
    .bind(serde_json::to_string(record).map_err(process_decode_error)?)
    .execute(&mut **tx)
    .await
    .map_err(plugin_sqlx_error)?;
    Ok(())
}

async fn load_event_by_key_tx(
    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
    process_id: &str,
    replay_key: &str,
) -> Result<Option<(String, ProcessEvent)>, PluginError> {
    let row = sqlx::query(
        "SELECT payload_hash, event_json
         FROM lash_process_events
         WHERE process_id = $1 AND idempotency_key = $2",
    )
    .bind(process_id)
    .bind(replay_key)
    .fetch_optional(&mut **tx)
    .await
    .map_err(plugin_sqlx_error)?;
    row.map(|row| {
        let hash: String = row.get(0);
        let json: String = row.get(1);
        serde_json::from_str(&json)
            .map(|event| (hash, event))
            .map_err(process_decode_error)
    })
    .transpose()
}

async fn load_process_lease_tx(
    tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
    process_id: &str,
) -> Result<Option<ProcessLease>, PluginError> {
    let row = sqlx::query(
        "SELECT lease_owner_id, lease_token, lease_fencing_token,
                lease_claimed_at_ms, lease_expires_at_ms
         FROM lash_process_leases
         WHERE process_id = $1",
    )
    .bind(process_id)
    .fetch_optional(&mut **tx)
    .await
    .map_err(plugin_sqlx_error)?;
    let Some(row) = row else {
        return Ok(None);
    };
    let owner_id: Option<String> = row.get(0);
    let lease_token: Option<String> = row.get(1);
    let (Some(owner_id), Some(lease_token)) = (owner_id, lease_token) else {
        return Ok(None);
    };
    Ok(Some(ProcessLease {
        schema_version: PROCESS_LEASE_SCHEMA_VERSION,
        process_id: process_id.to_string(),
        owner_id,
        lease_token,
        fencing_token: row.get::<i64, _>(2) as u64,
        claimed_at_epoch_ms: row.get::<i64, _>(3) as u64,
        expires_at_epoch_ms: row.get::<i64, _>(4) as u64,
    }))
}

fn process_lease_conflict(process_id: &str, current: &ProcessLease) -> PluginError {
    PluginError::Session(format!(
        "process `{process_id}` is already leased by `{}` until {}",
        current.owner_id, current.expires_at_epoch_ms
    ))
}

fn process_lease_expired(process_id: &str) -> PluginError {
    PluginError::Session(format!(
        "process lease for `{process_id}` is missing or expired"
    ))
}

fn guard_lease(current: Option<&ProcessLease>, lease_token: &str, now: u64) -> bool {
    current
        .map(|current| current.lease_token == lease_token && current.expires_at_epoch_ms > now)
        .unwrap_or(false)
}

async fn list_grants_for_scope(
    pool: &PgPool,
    session_scope: &SessionScope,
    live_only: bool,
) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
    let status_clause = if live_only {
        "AND p.status = 'running'"
    } else {
        ""
    };
    let sql = format!(
        "SELECT g.process_id, g.descriptor_json, p.record_json
         FROM lash_process_handle_grants g
         JOIN lash_processes p ON p.process_id = g.process_id
         WHERE g.scope_id = $1 {status_clause}
         ORDER BY g.process_id ASC"
    );
    let rows = sqlx::query(&sql)
        .bind(session_scope.id().as_str())
        .fetch_all(pool)
        .await
        .map_err(plugin_sqlx_error)?;
    let mut entries = Vec::new();
    for row in rows {
        let process_id: String = row.get(0);
        let descriptor_json: String = row.get(1);
        let record_json: String = row.get(2);
        let descriptor: ProcessHandleDescriptor =
            serde_json::from_str(&descriptor_json).map_err(process_decode_error)?;
        let record: ProcessRecord =
            serde_json::from_str(&record_json).map_err(process_decode_error)?;
        entries.push((
            ProcessHandleGrant {
                session_id: session_scope.session_id.clone(),
                process_id,
                descriptor,
            },
            record,
        ));
    }
    Ok(entries)
}