fn0-worker 0.3.33

Worker binary for the fn0 FaaS platform
use crate::cache::S3BundleCache;
use doc_db::{DbRequest, Database};
use fn0_shared_schema::WorkerManifestDocGet;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

const POLL_INTERVAL: Duration = Duration::from_secs(1);

pub fn build_database_from_env() -> anyhow::Result<Database> {
    let group_token = std::env::var("TURSO_GROUP_TOKEN")
        .map_err(|_| anyhow::anyhow!("TURSO_GROUP_TOKEN not set"))?;
    let host_suffix = std::env::var("TURSO_DB_HOST_SUFFIX")
        .map_err(|_| anyhow::anyhow!("TURSO_DB_HOST_SUFFIX not set"))?;
    let url = format!("https://fn0-control{host_suffix}");
    Ok(doc_db::turso_with_config(url, group_token))
}

pub async fn run(db: Database, cache: S3BundleCache, manifest_loaded: Arc<AtomicBool>) {
    let mut last_version: Option<u64> = None;
    let mut known_projects: HashMap<String, u64> = HashMap::new();
    let mut known_domains: HashMap<String, String> = HashMap::new();

    loop {
        tokio::time::sleep(POLL_INTERVAL).await;

        let manifest = match (WorkerManifestDocGet {}).send_with(&db).await {
            Ok(Some(m)) => m,
            Ok(None) => continue,
            Err(err) => {
                tracing::warn!(%err, "manifest fetch failed");
                continue;
            }
        };

        if last_version == Some(manifest.manifest_version) {
            continue;
        }

        let mut new_projects: HashMap<String, u64> = HashMap::new();
        let mut new_domains: HashMap<String, String> = HashMap::new();

        for (project_id, project_manifest) in &manifest.project_manifests {
            new_projects.insert(project_id.clone(), project_manifest.code_version);
            cache
                .register(project_id, project_manifest.code_version)
                .await;
            if let Some(domain) = &project_manifest.custom_domain {
                new_domains.insert(domain.clone(), project_id.clone());
                cache.register_domain(domain, project_id).await;
            }
        }

        for project_id in known_projects.keys() {
            if !new_projects.contains_key(project_id) {
                cache.unregister(project_id).await;
            }
        }
        for domain in known_domains.keys() {
            if !new_domains.contains_key(domain) {
                cache.unregister_domain(domain).await;
            }
        }

        known_projects = new_projects;
        known_domains = new_domains;
        last_version = Some(manifest.manifest_version);
        manifest_loaded.store(true, Ordering::Release);

        tracing::info!(
            manifest_version = manifest.manifest_version,
            projects = known_projects.len(),
            domains = known_domains.len(),
            "manifest applied"
        );
    }
}