gsm-core 0.4.21

Core types and platform abstractions for the Greentic messaging runtime.
Documentation
use std::{
    collections::VecDeque,
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    time::Duration,
};

use greentic_types::{EnvId, TenantCtx, TenantId};
use gsm_core::platforms::webchat::{
    bus::{EventBus, Subject},
    circuit::CircuitSettings,
    ingress::{
        ActivitiesEnvelope, ActivitiesTransport, ActivitiesTransportResponse, IngressCtx,
        IngressDeps, SharedActivitiesTransport, run_poll_loop,
    },
    normalize_activity,
    session::{MemorySessionStore, SharedSessionStore, WebchatSession, WebchatSessionStore},
    types::{GreenticEvent, IncomingMessage, MessagePayload},
};
use reqwest::StatusCode;
use serde_json::{Value, json};
use tokio::{sync::Mutex, time::Instant};

struct RecordingBus {
    events: Mutex<Vec<(String, GreenticEvent)>>,
}

impl RecordingBus {
    fn new() -> Self {
        Self {
            events: Mutex::new(Vec::new()),
        }
    }
}

#[async_trait::async_trait]
impl EventBus for RecordingBus {
    async fn publish(&self, subject: &Subject, event: &GreenticEvent) -> anyhow::Result<()> {
        self.events
            .lock()
            .await
            .push((subject.as_str().to_string(), event.clone()));
        Ok(())
    }
}

fn tenant_ctx() -> TenantCtx {
    TenantCtx::new(EnvId("dev".to_string()), TenantId("acme".to_string()))
}

fn webchat_session() -> WebchatSession {
    WebchatSession::new("conv-1".to_string(), tenant_ctx(), "token".into())
}

type CaseAssertion = Box<dyn Fn(&IncomingMessage) + Send + Sync>;

#[tokio::test]
async fn normalize_activity_variants() {
    let session = webchat_session();
    let cases: Vec<(&str, Value, CaseAssertion)> = vec![
        (
            "text",
            json!({
                "type": "message",
                "id": "msg-1",
                "timestamp": "2024-01-01T00:00:00Z",
                "text": "hello",
                "locale": "en-US",
                "from": {"id": "user-1"}
            }),
            Box::new(|msg| match &msg.payload {
                MessagePayload::Text { text, locale } => {
                    assert_eq!(text, "hello");
                    assert_eq!(locale.as_deref(), Some("en-US"));
                }
                other => panic!("unexpected payload: {other:?}"),
            }),
        ),
        (
            "typing",
            json!({
                "type": "typing",
                "id": "typing-1",
                "timestamp": "2024-01-01T00:00:01Z",
                "from": {"id": "user-1", "name": "Greentic"}
            }),
            Box::new(|msg| {
                assert!(matches!(msg.payload, MessagePayload::Typing));
            }),
        ),
        (
            "event",
            json!({
                "type": "event",
                "name": "handoff",
                "value": {"dest": "agent"},
                "id": "evt-1",
                "timestamp": "2024-01-01T00:00:02Z",
                "from": {"id": "system"}
            }),
            Box::new(|msg| match &msg.payload {
                MessagePayload::Event { name, value } => {
                    assert_eq!(name, "handoff");
                    assert!(value.is_some());
                }
                other => panic!("unexpected payload: {other:?}"),
            }),
        ),
        (
            "attachment",
            json!({
                "type": "message",
                "id": "msg-2",
                "timestamp": "2024-01-01T00:00:03Z",
                "attachments": [
                    {"contentType": "application/vnd.microsoft.card.adaptive", "content": {"title": "Card"}}
                ],
                "from": {"id": "bot"}
            }),
            Box::new(|msg| match &msg.payload {
                MessagePayload::Attachment {
                    content_type,
                    content,
                } => {
                    assert_eq!(content_type, "application/vnd.microsoft.card.adaptive");
                    assert_eq!(content["title"], "Card");
                }
                other => panic!("unexpected payload: {other:?}"),
            }),
        ),
    ];

    for (name, activity, assert_fn) in cases {
        let result = normalize_activity(&session, &activity).expect(name);
        assert_eq!(result.from.id, activity["from"]["id"].as_str().unwrap());
        assert_fn(&result);
    }
}

#[tokio::test]
async fn poll_loop_updates_watermark_and_publishes() {
    let responses = vec![
        ok_response(
            vec![json!({
                "type": "message",
                "id": "m-1",
                "timestamp": "2024-01-01T00:00:00Z",
                "text": "hello",
                "from": {"id": "user"}
            })],
            Some("10"),
        ),
        status_response(StatusCode::UNAUTHORIZED),
    ];
    let (transport, counter) = queue_transport(responses);

    let bus = Arc::new(RecordingBus::new());
    let store = Arc::new(MemorySessionStore::default());
    let sessions: SharedSessionStore = store.clone();
    let deps = IngressDeps {
        bus: bus.clone(),
        sessions: sessions.clone(),
        dl_base: "https://directline.example/v3/directline".into(),
        transport,
        circuit: CircuitSettings::default(),
    };
    let ctx = IngressCtx {
        tenant_ctx: tenant_ctx(),
        conversation_id: "conv-1".into(),
        token: "token".into(),
    };

    run_poll_loop(deps, ctx).await.unwrap();

    let stored = store.get("conv-1").await.unwrap().unwrap();
    assert_eq!(stored.watermark.as_deref(), Some("10"));
    let events = bus.events.lock().await;
    assert_eq!(events.len(), 1);
    assert_eq!(counter.load(Ordering::SeqCst), 2);
}

#[tokio::test]
async fn poll_loop_retries_with_backoff() {
    let responses = vec![
        status_response(StatusCode::TOO_MANY_REQUESTS),
        status_response(StatusCode::INTERNAL_SERVER_ERROR),
        ok_response(
            vec![json!({
                "type": "typing",
                "id": "m-2",
                "timestamp": "2024-01-01T00:00:00Z",
                "from": {"id": "user"}
            })],
            Some("11"),
        ),
        status_response(StatusCode::FORBIDDEN),
    ];
    let (transport, counter) = queue_transport(responses);

    tokio::time::pause();
    let bus = Arc::new(RecordingBus::new());
    let store = Arc::new(MemorySessionStore::default());
    let sessions: SharedSessionStore = store.clone();
    let deps = IngressDeps {
        bus,
        sessions,
        dl_base: "https://directline.example/v3/directline".into(),
        transport,
        circuit: CircuitSettings::default(),
    };
    let ctx = IngressCtx {
        tenant_ctx: tenant_ctx(),
        conversation_id: "conv-1".into(),
        token: "token".into(),
    };

    let start = Instant::now();
    run_poll_loop(deps, ctx).await.unwrap();
    assert!(counter.load(Ordering::SeqCst) >= 3);
    assert!(start.elapsed() >= Duration::from_millis(10));
}

fn ok_response(activities: Vec<Value>, watermark: Option<&str>) -> ActivitiesTransportResponse {
    ActivitiesTransportResponse {
        status: StatusCode::OK,
        body: Some(ActivitiesEnvelope {
            activities,
            watermark: watermark.map(|value| value.to_string()),
        }),
    }
}

fn status_response(status: StatusCode) -> ActivitiesTransportResponse {
    ActivitiesTransportResponse { status, body: None }
}

fn queue_transport(
    responses: Vec<ActivitiesTransportResponse>,
) -> (SharedActivitiesTransport, Arc<AtomicUsize>) {
    let counter = Arc::new(AtomicUsize::new(0));
    let transport: SharedActivitiesTransport = Arc::new(MockTransport {
        responses: Mutex::new(VecDeque::from(responses)),
        counter: counter.clone(),
    });
    (transport, counter)
}

struct MockTransport {
    responses: Mutex<VecDeque<ActivitiesTransportResponse>>,
    counter: Arc<AtomicUsize>,
}

#[async_trait::async_trait]
impl ActivitiesTransport for MockTransport {
    async fn poll(
        &self,
        _: &str,
        _: &str,
        _: &str,
        _: Option<&str>,
    ) -> anyhow::Result<ActivitiesTransportResponse> {
        self.counter.fetch_add(1, Ordering::SeqCst);
        let mut guard = self.responses.lock().await;
        Ok(guard
            .pop_front()
            .unwrap_or_else(|| status_response(StatusCode::INTERNAL_SERVER_ERROR)))
    }
}