coil-runtime 0.1.0

HTTP runtime and request handling for the Coil framework.
Documentation
use std::sync::OnceLock;

use coil_data::{DataRuntime, PostgresDataClient};
use sqlx::Row;

use super::*;

#[derive(Debug, Clone)]
pub(super) struct SharedWebhookObservationStore {
    runtime: DataRuntime,
    client: OnceLock<Result<PostgresDataClient, String>>,
    schema: String,
    initialized: OnceLock<Result<(), String>>,
}

impl SharedWebhookObservationStore {
    pub(super) fn open(runtime: DataRuntime) -> Self {
        let schema = runtime.schema.clone();
        Self {
            runtime,
            client: OnceLock::new(),
            schema,
            initialized: OnceLock::new(),
        }
    }

    pub(super) fn location_label(&self) -> String {
        format!(
            "shared-postgres:{}.webhook_observation_entries",
            self.schema
        )
    }

    pub(super) fn insert(&self, record: &WebhookObservationEvent) -> Result<(), String> {
        self.ensure_initialized()?;
        let client = self.client()?.clone();
        let table = self.qualified_table();
        let record = record.clone();
        run_blocking(async move {
            sqlx::query(&format!(
                "INSERT INTO {} (recorded_at_unix_seconds, app_id, source, event, status, trace_id, principal_kind, principal_id, detail) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
                table
            ))
            .bind(record.recorded_at_unix_seconds)
            .bind(&record.app_id)
            .bind(&record.source)
            .bind(&record.event)
            .bind(record.status.as_str())
            .bind(&record.trace_id)
            .bind(&record.principal_kind)
            .bind(&record.principal_id)
            .bind(&record.detail)
            .execute(&client.pool)
            .await
            .map_err(|error| format!("failed to write shared webhook observation entry: {error}"))?;
            Ok(())
        })
    }

    pub(super) fn count(&self) -> Result<usize, String> {
        self.ensure_initialized()?;
        let client = self.client()?.clone();
        let table = self.qualified_table();
        run_blocking(async move {
            let count: i64 = sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {}", table))
                .fetch_one(&client.pool)
                .await
                .map_err(|error| {
                    format!("failed to count shared webhook observation entries: {error}")
                })?;
            usize::try_from(count)
                .map_err(|_| "shared webhook observation entry count overflowed usize".to_string())
        })
    }

    pub(super) fn status_counts(&self) -> Result<WebhookObservationStatusCounts, String> {
        self.ensure_initialized()?;
        let client = self.client()?.clone();
        let table = self.qualified_table();
        run_blocking(async move {
            let rows = sqlx::query(&format!(
                "SELECT status, COUNT(*) FROM {} GROUP BY status",
                table
            ))
            .fetch_all(&client.pool)
            .await
            .map_err(|error| {
                format!("failed to query shared webhook observation counts: {error}")
            })?;
            let decoded = rows
                .into_iter()
                .map(|row| {
                    let status: String = row.try_get(0).map_err(|error| {
                        format!("failed to decode shared webhook observation status: {error}")
                    })?;
                    let count: i64 = row.try_get(1).map_err(|error| {
                        format!("failed to decode shared webhook observation count: {error}")
                    })?;
                    Ok((status, count))
                })
                .collect::<Result<Vec<_>, String>>()?;
            decode_status_counts(decoded)
        })
    }

    pub(super) fn recent(&self, limit: usize) -> Result<Vec<WebhookObservationEvent>, String> {
        if limit == 0 {
            return Ok(Vec::new());
        }

        self.ensure_initialized()?;
        let client = self.client()?.clone();
        let table = self.qualified_table();
        run_blocking(async move {
            let rows = sqlx::query(&format!(
                "SELECT id, recorded_at_unix_seconds, app_id, source, event, status, trace_id, principal_kind, principal_id, detail FROM {} ORDER BY recorded_at_unix_seconds DESC, id DESC LIMIT $1",
                table
            ))
            .bind(limit as i64)
            .fetch_all(&client.pool)
            .await
            .map_err(|error| format!("failed to query shared webhook observation entries: {error}"))?;
            let mut records = rows
                .into_iter()
                .map(|row| {
                    let status: String = row.try_get(5).map_err(|error| {
                        format!("failed to decode shared webhook observation status: {error}")
                    })?;
                    Ok(WebhookObservationEvent {
                        id: row.try_get(0).map_err(|error| {
                            format!("failed to decode shared webhook observation id: {error}")
                        })?,
                        recorded_at_unix_seconds: row.try_get(1).map_err(|error| {
                            format!(
                                "failed to decode shared webhook observation timestamp: {error}"
                            )
                        })?,
                        app_id: row.try_get(2).map_err(|error| {
                            format!("failed to decode shared webhook observation app id: {error}")
                        })?,
                        source: row.try_get(3).map_err(|error| {
                            format!("failed to decode shared webhook observation source: {error}")
                        })?,
                        event: row.try_get(4).map_err(|error| {
                            format!("failed to decode shared webhook observation event: {error}")
                        })?,
                        status: WebhookObservationStatus::from_db_value(&status)?,
                        trace_id: row.try_get(6).map_err(|error| {
                            format!("failed to decode shared webhook observation trace id: {error}")
                        })?,
                        principal_kind: row.try_get(7).map_err(|error| {
                            format!(
                                "failed to decode shared webhook observation principal kind: {error}"
                            )
                        })?,
                        principal_id: row.try_get(8).map_err(|error| {
                            format!(
                                "failed to decode shared webhook observation principal id: {error}"
                            )
                        })?,
                        detail: row.try_get(9).map_err(|error| {
                            format!("failed to decode shared webhook observation detail: {error}")
                        })?,
                    })
                })
                .collect::<Result<Vec<_>, String>>()?;
            records.reverse();
            Ok(records)
        })
    }

    pub(super) fn claim_delivery(
        &self,
        app_id: &str,
        route_name: &str,
        source: &str,
        delivery_id: &str,
        request_id: &str,
        recorded_at_unix_seconds: i64,
    ) -> Result<bool, String> {
        self.ensure_initialized()?;
        let client = self.client()?.clone();
        let table = self.qualified_delivery_table();
        let app_id = app_id.to_string();
        let route_name = route_name.to_string();
        let source = source.to_string();
        let delivery_id = delivery_id.to_string();
        let request_id = request_id.to_string();
        run_blocking(async move {
            let rows = sqlx::query(&format!(
                "INSERT INTO {} (app_id, route_name, source, delivery_id, first_seen_request_id, first_seen_at_unix_seconds) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (app_id, source, delivery_id) DO NOTHING",
                table
            ))
            .bind(&app_id)
            .bind(&route_name)
            .bind(&source)
            .bind(&delivery_id)
            .bind(&request_id)
            .bind(recorded_at_unix_seconds)
            .execute(&client.pool)
            .await
            .map_err(|error| format!("failed to persist shared verified webhook delivery: {error}"))?
            .rows_affected();
            Ok(rows > 0)
        })
    }

    fn client(&self) -> Result<&PostgresDataClient, String> {
        self.client
            .get_or_init(|| {
                self.runtime
                    .connect_lazy_postgres()
                    .map_err(|error| error.to_string())
            })
            .as_ref()
            .map_err(|error| error.clone())
    }

    fn ensure_initialized(&self) -> Result<(), String> {
        let schema_ident = quote_identifier(&self.schema);
        self.initialized
            .get_or_init(|| {
                let client = self.client()?.clone();
                run_blocking(async move {
                    sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {schema_ident}"))
                        .execute(&client.pool)
                        .await
                        .map_err(|error| {
                            format!(
                                "failed to initialize shared webhook observation schema: {error}"
                            )
                        })?;
                    sqlx::query(&format!(
                        "CREATE TABLE IF NOT EXISTS {schema_ident}.webhook_observation_entries (
                            id BIGSERIAL PRIMARY KEY,
                            recorded_at_unix_seconds BIGINT NOT NULL,
                            app_id TEXT NOT NULL,
                            source TEXT NOT NULL,
                            event TEXT NOT NULL,
                            status TEXT NOT NULL,
                            trace_id TEXT NOT NULL,
                            principal_kind TEXT NOT NULL,
                            principal_id TEXT,
                            detail TEXT
                        )"
                    ))
                    .execute(&client.pool)
                    .await
                    .map_err(|error| format!(
                        "failed to initialize shared webhook observation table: {error}"
                    ))?;
                    sqlx::query(&format!(
                        "CREATE INDEX IF NOT EXISTS webhook_observation_entries_recent
                            ON {schema_ident}.webhook_observation_entries (recorded_at_unix_seconds DESC, id DESC)"
                    ))
                    .execute(&client.pool)
                    .await
                    .map_err(|error| format!(
                        "failed to initialize shared webhook observation recent index: {error}"
                    ))?;
                    sqlx::query(&format!(
                        "CREATE INDEX IF NOT EXISTS webhook_observation_entries_lookup
                            ON {schema_ident}.webhook_observation_entries (source, event, status)"
                    ))
                    .execute(&client.pool)
                    .await
                    .map_err(|error| format!(
                        "failed to initialize shared webhook observation lookup index: {error}"
                    ))?;
                    sqlx::query(&format!(
                        "CREATE TABLE IF NOT EXISTS {schema_ident}.verified_webhook_deliveries (
                            app_id TEXT NOT NULL,
                            route_name TEXT NOT NULL,
                            source TEXT NOT NULL,
                            delivery_id TEXT NOT NULL,
                            first_seen_request_id TEXT NOT NULL,
                            first_seen_at_unix_seconds BIGINT NOT NULL,
                            PRIMARY KEY (app_id, source, delivery_id)
                        )"
                    ))
                    .execute(&client.pool)
                    .await
                    .map_err(|error| format!(
                        "failed to initialize shared verified webhook delivery table: {error}"
                    ))?;
                    Ok(())
                })
            })
            .clone()
    }

    fn qualified_table(&self) -> String {
        format!(
            "{}.{}",
            quote_identifier(&self.schema),
            quote_identifier("webhook_observation_entries")
        )
    }

    fn qualified_delivery_table(&self) -> String {
        format!(
            "{}.{}",
            quote_identifier(&self.schema),
            quote_identifier("verified_webhook_deliveries")
        )
    }
}

fn quote_identifier(identifier: &str) -> String {
    format!("\"{}\"", identifier.replace('"', "\"\""))
}

fn run_blocking<T, F>(future: F) -> Result<T, String>
where
    T: Send + 'static,
    F: std::future::Future<Output = Result<T, String>> + Send + 'static,
{
    match tokio::runtime::Handle::try_current() {
        Ok(handle) => match handle.runtime_flavor() {
            tokio::runtime::RuntimeFlavor::MultiThread => {
                tokio::task::block_in_place(|| handle.block_on(future))
            }
            tokio::runtime::RuntimeFlavor::CurrentThread => run_future_on_dedicated_runtime(future),
            _ => run_future_on_dedicated_runtime(future),
        },
        Err(_) => run_future_on_ephemeral_runtime(future),
    }
}

fn run_future_on_dedicated_runtime<T, F>(future: F) -> Result<T, String>
where
    T: Send + 'static,
    F: std::future::Future<Output = Result<T, String>> + Send + 'static,
{
    std::thread::spawn(move || {
        let runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .map_err(|error| error.to_string())?;
        runtime.block_on(future)
    })
    .join()
    .map_err(|_| "shared webhook worker thread panicked".to_string())?
}

fn run_future_on_ephemeral_runtime<T, F>(future: F) -> Result<T, String>
where
    T: Send + 'static,
    F: std::future::Future<Output = Result<T, String>> + Send + 'static,
{
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .map_err(|error| error.to_string())?;
    runtime.block_on(future)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn shared_webhook_run_blocking_works_inside_current_thread_runtime() {
        let runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        let value = runtime.block_on(async { run_blocking(async { Ok::<_, String>(11usize) }) });

        assert_eq!(value.unwrap(), 11);
    }
}