tandem-server 0.5.6

HTTP server for Tandem engine APIs
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use tandem_channels::traits::InteractiveCardSent;
use tokio::sync::RwLock;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ApprovalMessageRecord {
    pub request_id: String,
    pub channel: String,
    pub recipient: String,
    pub message_id: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub thread_id: Option<String>,
}

impl ApprovalMessageRecord {
    pub fn from_sent(request_id: impl Into<String>, sent: InteractiveCardSent) -> Self {
        Self {
            request_id: request_id.into(),
            channel: sent.channel,
            recipient: sent.recipient,
            message_id: sent.message_id,
            thread_id: sent.thread_id,
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ApprovalCallbackRecord {
    pub callback_id: String,
    pub request_id: String,
    pub run_id: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub node_id: Option<String>,
    pub channel: String,
    pub recipient: String,
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
struct ApprovalMessageMapFile {
    #[serde(default)]
    messages: HashMap<String, ApprovalMessageRecord>,
    #[serde(default)]
    run_threads: HashMap<String, ApprovalMessageRecord>,
    #[serde(default)]
    telegram_callbacks: HashMap<String, ApprovalCallbackRecord>,
}

#[derive(Debug, Clone)]
pub struct ApprovalMessageMap {
    path: PathBuf,
    data: Arc<RwLock<ApprovalMessageMapFile>>,
}

impl ApprovalMessageMap {
    pub async fn load_or_default(path: impl Into<PathBuf>) -> Self {
        let path = path.into();
        let data = load_message_map(&path).await.unwrap_or_default();
        Self {
            path,
            data: Arc::new(RwLock::new(data)),
        }
    }

    pub fn ephemeral() -> Self {
        Self {
            path: PathBuf::new(),
            data: Arc::new(RwLock::new(ApprovalMessageMapFile::default())),
        }
    }

    pub async fn record_sent(
        &self,
        request_id: impl Into<String>,
        sent: InteractiveCardSent,
    ) -> anyhow::Result<()> {
        let record = ApprovalMessageRecord::from_sent(request_id, sent);
        self.record_message(record, None).await
    }

    pub async fn record_approval_sent(
        &self,
        request: &tandem_types::ApprovalRequest,
        sent: InteractiveCardSent,
    ) -> anyhow::Result<()> {
        let record = ApprovalMessageRecord::from_sent(request.request_id.clone(), sent);
        self.record_message(record, Some(request.run_id.as_str()))
            .await
    }

    pub async fn record_telegram_callback(
        &self,
        callback_id: impl Into<String>,
        request: &tandem_types::ApprovalRequest,
        recipient: impl Into<String>,
    ) -> anyhow::Result<()> {
        let callback_id = callback_id.into();
        let record = ApprovalCallbackRecord {
            callback_id: callback_id.clone(),
            request_id: request.request_id.clone(),
            run_id: request.run_id.clone(),
            node_id: request.node_id.clone(),
            channel: "telegram".to_string(),
            recipient: recipient.into(),
        };
        let mut data = self.data.write().await;
        data.telegram_callbacks.insert(callback_id, record);
        self.persist_locked(&data).await
    }

    async fn record_message(
        &self,
        record: ApprovalMessageRecord,
        run_id: Option<&str>,
    ) -> anyhow::Result<()> {
        let mut data = self.data.write().await;
        if let Some(run_id) = run_id.map(str::trim).filter(|value| !value.is_empty()) {
            data.run_threads.insert(run_id.to_string(), record.clone());
        }
        data.messages.insert(record.request_id.clone(), record);
        self.persist_locked(&data).await
    }

    pub async fn get(&self, request_id: &str) -> Option<ApprovalMessageRecord> {
        self.data.read().await.messages.get(request_id).cloned()
    }

    pub async fn get_thread_for_run(&self, run_id: &str) -> Option<ApprovalMessageRecord> {
        self.data.read().await.run_threads.get(run_id).cloned()
    }

    pub async fn get_telegram_callback(&self, callback_id: &str) -> Option<ApprovalCallbackRecord> {
        self.data
            .read()
            .await
            .telegram_callbacks
            .get(callback_id)
            .cloned()
    }

    async fn persist_locked(&self, data: &ApprovalMessageMapFile) -> anyhow::Result<()> {
        if self.path.as_os_str().is_empty() {
            return Ok(());
        }
        if let Some(parent) = self.path.parent() {
            tokio::fs::create_dir_all(parent).await?;
        }
        let payload = serde_json::to_string_pretty(data)?;
        let tmp = self.path.with_extension("tmp");
        tokio::fs::write(&tmp, payload).await?;
        tokio::fs::rename(tmp, &self.path).await?;
        Ok(())
    }
}

async fn load_message_map(path: &Path) -> anyhow::Result<ApprovalMessageMapFile> {
    let raw = match tokio::fs::read_to_string(path).await {
        Ok(raw) => raw,
        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
            return Ok(ApprovalMessageMapFile::default())
        }
        Err(err) => return Err(err.into()),
    };
    serde_json::from_str(&raw).map_err(Into::into)
}

#[cfg(test)]
mod tests {
    use super::*;
    use tandem_types::{ApprovalDecision, ApprovalRequest, ApprovalSourceKind, ApprovalTenantRef};

    fn sent(message_id: &str) -> InteractiveCardSent {
        InteractiveCardSent {
            channel: "slack".to_string(),
            message_id: message_id.to_string(),
            recipient: "C123".to_string(),
            thread_id: Some("1700000000.000100".to_string()),
        }
    }

    fn request(run_id: &str) -> ApprovalRequest {
        ApprovalRequest {
            request_id: format!("automation_v2:{run_id}:send_email"),
            source: ApprovalSourceKind::AutomationV2,
            tenant: ApprovalTenantRef {
                org_id: "org".to_string(),
                workspace_id: "workspace".to_string(),
                user_id: None,
            },
            run_id: run_id.to_string(),
            node_id: Some("send_email".to_string()),
            workflow_name: Some("Sales outreach".to_string()),
            action_kind: Some("send email".to_string()),
            action_preview_markdown: None,
            surface_payload: None,
            requested_at_ms: 1,
            expires_at_ms: None,
            decisions: vec![ApprovalDecision::Approve],
            rework_targets: vec![],
            instructions: None,
            decided_by: None,
            decided_at_ms: None,
            decision: None,
            rework_feedback: None,
        }
    }

    #[tokio::test]
    async fn records_and_reads_sent_message() {
        let map = ApprovalMessageMap::ephemeral();
        map.record_sent("req-1", sent("1700000000.000100"))
            .await
            .unwrap();

        let record = map.get("req-1").await.unwrap();
        assert_eq!(record.channel, "slack");
        assert_eq!(record.message_id, "1700000000.000100");
    }

    #[tokio::test]
    async fn persists_message_map_to_disk() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("approval_message_map.json");
        let map = ApprovalMessageMap::load_or_default(&path).await;
        map.record_sent("req-1", sent("1700000000.000100"))
            .await
            .unwrap();

        let loaded = ApprovalMessageMap::load_or_default(&path).await;
        let record = loaded.get("req-1").await.unwrap();
        assert_eq!(record.recipient, "C123");
        assert_eq!(record.thread_id.as_deref(), Some("1700000000.000100"));
    }

    #[tokio::test]
    async fn records_run_thread_lookup() {
        let map = ApprovalMessageMap::ephemeral();
        let request = request("run-1");
        map.record_approval_sent(&request, sent("1700000000.000100"))
            .await
            .unwrap();

        let record = map.get_thread_for_run("run-1").await.unwrap();
        assert_eq!(record.request_id, "automation_v2:run-1:send_email");
        assert_eq!(record.thread_id.as_deref(), Some("1700000000.000100"));
    }

    #[tokio::test]
    async fn persists_run_thread_lookup_to_disk() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("approval_message_map.json");
        let map = ApprovalMessageMap::load_or_default(&path).await;
        let request = request("run-1");
        map.record_approval_sent(&request, sent("1700000000.000100"))
            .await
            .unwrap();

        let loaded = ApprovalMessageMap::load_or_default(&path).await;
        let record = loaded.get_thread_for_run("run-1").await.unwrap();
        assert_eq!(record.recipient, "C123");
        assert_eq!(record.message_id, "1700000000.000100");
    }

    #[tokio::test]
    async fn records_and_reads_telegram_callback_mapping() {
        let map = ApprovalMessageMap::ephemeral();
        let request = request("run-abcdef");
        map.record_telegram_callback("tgcb_123", &request, "12345")
            .await
            .unwrap();

        let record = map.get_telegram_callback("tgcb_123").await.unwrap();
        assert_eq!(record.request_id, "automation_v2:run-abcdef:send_email");
        assert_eq!(record.run_id, "run-abcdef");
        assert_eq!(record.node_id.as_deref(), Some("send_email"));
        assert_eq!(record.recipient, "12345");
    }

    #[tokio::test]
    async fn persists_telegram_callback_mapping_to_disk() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("approval_message_map.json");
        let map = ApprovalMessageMap::load_or_default(&path).await;
        let request = request("run-abcdef");
        map.record_telegram_callback("tgcb_123", &request, "12345")
            .await
            .unwrap();

        let loaded = ApprovalMessageMap::load_or_default(&path).await;
        let record = loaded.get_telegram_callback("tgcb_123").await.unwrap();
        assert_eq!(record.run_id, "run-abcdef");
        assert_eq!(record.node_id.as_deref(), Some("send_email"));
    }
}