coil-runtime 0.1.0

HTTP runtime and request handling for the Coil framework.
Documentation
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};

use rusqlite::{Connection, params};

use super::*;

#[derive(Debug, Clone)]
pub(super) struct LocalWebhookObservationStore {
    path: PathBuf,
    connection: Arc<Mutex<Connection>>,
}

impl LocalWebhookObservationStore {
    pub(super) fn open(root: PathBuf, namespace: String) -> Self {
        let path = database_path(&root, &namespace);
        if let Some(parent) = path.parent() {
            std::fs::create_dir_all(parent).unwrap_or_else(|error| {
                panic!(
                    "failed to create webhook observation directory `{}`: {error}",
                    parent.display()
                )
            });
        }

        let connection = Connection::open(&path).unwrap_or_else(|error| {
            panic!(
                "failed to open local webhook observation store `{}`: {error}",
                path.display()
            )
        });
        connection
            .execute_batch(
                r#"
                PRAGMA journal_mode = WAL;
                PRAGMA synchronous = FULL;
                CREATE TABLE IF NOT EXISTS webhook_observation_entries (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    recorded_at_unix_seconds INTEGER NOT NULL,
                    app_id TEXT NOT NULL,
                    source TEXT NOT NULL,
                    event TEXT NOT NULL,
                    status TEXT NOT NULL,
                    trace_id TEXT NOT NULL,
                    principal_kind TEXT NOT NULL,
                    principal_id TEXT,
                    detail TEXT
                );
                CREATE INDEX IF NOT EXISTS webhook_observation_entries_recent
                    ON webhook_observation_entries (recorded_at_unix_seconds DESC, id DESC);
                CREATE INDEX IF NOT EXISTS webhook_observation_entries_lookup
                    ON webhook_observation_entries (source, event, status);
                CREATE TABLE IF NOT EXISTS verified_webhook_deliveries (
                    app_id TEXT NOT NULL,
                    route_name TEXT NOT NULL,
                    source TEXT NOT NULL,
                    delivery_id TEXT NOT NULL,
                    first_seen_request_id TEXT NOT NULL,
                    first_seen_at_unix_seconds INTEGER NOT NULL,
                    PRIMARY KEY (app_id, source, delivery_id)
                );
                "#,
            )
            .unwrap_or_else(|error| {
                panic!(
                    "failed to initialize local webhook observation store `{}`: {error}",
                    path.display()
                )
            });

        Self {
            path,
            connection: Arc::new(Mutex::new(connection)),
        }
    }

    pub(super) fn location_label(&self) -> String {
        format!("local-sqlite:{}", self.path.display())
    }

    pub(super) fn path(&self) -> &Path {
        self.path.as_path()
    }

    pub(super) fn insert(&self, record: &WebhookObservationEvent) -> Result<(), String> {
        let mut connection = self
            .connection
            .lock()
            .map_err(|_| "webhook observation store is poisoned".to_string())?;
        let tx = connection.transaction().map_err(|error| {
            format!("failed to start local webhook observation transaction: {error}")
        })?;
        tx.execute(
            r#"
            INSERT INTO webhook_observation_entries (
                recorded_at_unix_seconds,
                app_id,
                source,
                event,
                status,
                trace_id,
                principal_kind,
                principal_id,
                detail
            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
            "#,
            params![
                record.recorded_at_unix_seconds,
                &record.app_id,
                &record.source,
                &record.event,
                record.status.as_str(),
                &record.trace_id,
                &record.principal_kind,
                &record.principal_id,
                &record.detail,
            ],
        )
        .map_err(|error| format!("failed to write local webhook observation entry: {error}"))?;
        tx.commit().map_err(|error| {
            format!("failed to commit local webhook observation entry: {error}")
        })?;
        Ok(())
    }

    pub(super) fn count(&self) -> Result<usize, String> {
        let connection = self
            .connection
            .lock()
            .map_err(|_| "webhook observation store is poisoned".to_string())?;
        let count: i64 = connection
            .query_row(
                "SELECT COUNT(*) FROM webhook_observation_entries",
                [],
                |row| row.get(0),
            )
            .map_err(|error| {
                format!("failed to count local webhook observation entries: {error}")
            })?;
        usize::try_from(count)
            .map_err(|_| "local webhook observation entry count overflowed usize".to_string())
    }

    pub(super) fn status_counts(&self) -> Result<WebhookObservationStatusCounts, String> {
        let connection = self
            .connection
            .lock()
            .map_err(|_| "webhook observation store is poisoned".to_string())?;
        let mut statement = connection
            .prepare(
                r#"
                SELECT status, COUNT(*)
                FROM webhook_observation_entries
                GROUP BY status
                "#,
            )
            .map_err(|error| {
                format!("failed to query local webhook observation counts: {error}")
            })?;
        let rows = statement
            .query_map([], |row| {
                Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
            })
            .map_err(|error| format!("failed to map local webhook observation counts: {error}"))?
            .collect::<Result<Vec<_>, _>>()
            .map_err(|error| {
                format!("failed to collect local webhook observation counts: {error}")
            })?;
        decode_status_counts(rows)
    }

    pub(super) fn recent(&self, limit: usize) -> Result<Vec<WebhookObservationEvent>, String> {
        if limit == 0 {
            return Ok(Vec::new());
        }

        let connection = self
            .connection
            .lock()
            .map_err(|_| "webhook observation store is poisoned".to_string())?;
        let mut statement = connection
            .prepare(
                r#"
                SELECT
                    id,
                    recorded_at_unix_seconds,
                    app_id,
                    source,
                    event,
                    status,
                    trace_id,
                    principal_kind,
                    principal_id,
                    detail
                FROM webhook_observation_entries
                ORDER BY recorded_at_unix_seconds DESC, id DESC
                LIMIT ?1
                "#,
            )
            .map_err(|error| {
                format!("failed to query local webhook observation entries: {error}")
            })?;
        let mut records = statement
            .query_map(params![limit as i64], |row| {
                let status = row.get::<_, String>(5)?;
                let status = WebhookObservationStatus::from_db_value(&status).map_err(|error| {
                    rusqlite::Error::FromSqlConversionFailure(
                        5,
                        rusqlite::types::Type::Text,
                        Box::new(std::io::Error::new(std::io::ErrorKind::InvalidData, error)),
                    )
                })?;
                Ok(WebhookObservationEvent {
                    id: row.get(0)?,
                    recorded_at_unix_seconds: row.get(1)?,
                    app_id: row.get(2)?,
                    source: row.get(3)?,
                    event: row.get(4)?,
                    status,
                    trace_id: row.get(6)?,
                    principal_kind: row.get(7)?,
                    principal_id: row.get(8)?,
                    detail: row.get(9)?,
                })
            })
            .map_err(|error| format!("failed to map local webhook observation entries: {error}"))?
            .collect::<Result<Vec<_>, _>>()
            .map_err(|error| {
                format!("failed to collect local webhook observation entries: {error}")
            })?;
        records.reverse();
        Ok(records)
    }

    pub(super) fn claim_delivery(
        &self,
        app_id: &str,
        route_name: &str,
        source: &str,
        delivery_id: &str,
        request_id: &str,
        recorded_at_unix_seconds: i64,
    ) -> Result<bool, String> {
        let mut connection = self
            .connection
            .lock()
            .map_err(|_| "webhook observation store is poisoned".to_string())?;
        let tx = connection.transaction().map_err(|error| {
            format!("failed to start local verified webhook replay transaction: {error}")
        })?;
        let inserted = tx
            .execute(
                r#"
                INSERT OR IGNORE INTO verified_webhook_deliveries (
                    app_id,
                    route_name,
                    source,
                    delivery_id,
                    first_seen_request_id,
                    first_seen_at_unix_seconds
                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)
                "#,
                params![
                    app_id,
                    route_name,
                    source,
                    delivery_id,
                    request_id,
                    recorded_at_unix_seconds,
                ],
            )
            .map_err(|error| format!("failed to persist verified webhook delivery: {error}"))?;
        tx.commit().map_err(|error| {
            format!("failed to commit verified webhook delivery receipt: {error}")
        })?;
        Ok(inserted > 0)
    }
}

fn database_path(root: &Path, namespace: &str) -> PathBuf {
    root.join("wasm")
        .join("webhooks")
        .join(format!("{}.sqlite3", sanitize_namespace(namespace)))
}

fn sanitize_namespace(namespace: &str) -> String {
    namespace
        .chars()
        .map(|ch| {
            if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.') {
                ch
            } else {
                '_'
            }
        })
        .collect()
}