gsm-core 0.4.2

Core types and platform abstractions for the Greentic messaging runtime.
Documentation
use std::sync::Arc;

use axum::{
    Json,
    extract::{Path, State},
};
use greentic_types::TenantCtx;
use gsm_core::platforms::webchat::{
    config::{Config, OAuthProviderConfig},
    conversation::memory_store,
    directline_client::{DirectLineError, MockDirectLineApi},
    error::WebChatError,
    http::{
        AdminPath, AdminPostActivityRequest, AppState, DirectLinePoster, SharedDirectLinePoster,
        admin_post_activity,
    },
    oauth::GreenticOauthClient,
    session::{MemorySessionStore, SharedSessionStore, WebchatSession, WebchatSessionStore},
};
use reqwest::Client;
use serde_json::{Value, json};
use tokio::sync::Mutex;

#[path = "webchat_support.rs"]
mod support;

use support::{provider_with_secrets, signing_scope, tenant_ctx, tenant_scope};

#[tokio::test]
async fn admin_posts_to_specific_conversation() {
    let direct_line = Arc::new(MockDirectLineApi::default());
    let client = Client::builder().build().unwrap();
    let store = Arc::new(MemorySessionStore::default());
    let sessions: SharedSessionStore = store.clone();
    store
        .upsert(WebchatSession::new(
            "conv-1".to_string(),
            tenant_ctx("dev", "acme", Some("support")),
            "token-1".to_string(),
        ))
        .await
        .unwrap();
    let conversations = memory_store();
    conversations
        .create("conv-1", tenant_ctx("dev", "acme", Some("support")))
        .await
        .unwrap();

    let poster = Arc::new(RecordingPoster::default());
    let scope = tenant_scope("dev", "acme", None);
    let provider = provider_with_secrets(
        Config::with_base_url("https://directline.test/v3/directline"),
        signing_scope(),
        &[(&scope, "webchat", "channel_token", "dl-secret")],
    );
    let state = AppState::new(provider, direct_line, client.clone())
        .with_sessions(sessions)
        .with_activity_poster(poster.clone() as SharedDirectLinePoster)
        .with_oauth_client(Arc::new(StubOauthClient))
        .with_conversations(conversations.clone());

    let Json(payload) = admin_post_activity(
        State(state),
        Path(AdminPath {
            env: "dev".to_string(),
            tenant: "acme".to_string(),
        }),
        Json(AdminPostActivityRequest {
            team: None,
            conversation_id: Some("conv-1".to_string()),
            activity: json!({
                "type": "message",
                "text": "hello proactive"
            }),
        }),
    )
    .await
    .unwrap();

    assert_eq!(payload.posted, 1);
    assert_eq!(payload.skipped, 0);

    let calls = poster.calls.lock().await;
    assert!(calls.is_empty(), "should not post via Direct Line");

    let page = conversations
        .activities("conv-1", None)
        .await
        .expect("conversation activities");
    assert_eq!(page.activities.len(), 1);
    let stored = &page.activities[0].activity;
    assert_eq!(stored.text.as_deref(), Some("hello proactive"));
    assert_eq!(
        stored
            .from
            .as_ref()
            .map(|from| (from.id.as_str(), from.role.as_deref())),
        Some(("bot", Some("bot")))
    );
    let session = store.get("conv-1").await.unwrap().unwrap();
    assert_eq!(session.watermark.as_deref(), Some("1"));
}

#[tokio::test]
async fn admin_broadcast_respects_proactive_flags() {
    let direct_line = Arc::new(MockDirectLineApi::default());
    let client = Client::builder().build().unwrap();
    let store = Arc::new(MemorySessionStore::default());
    let sessions: SharedSessionStore = store.clone();

    store
        .upsert(WebchatSession::new(
            "conv-allow".to_string(),
            tenant_ctx("DEV", "ACME", Some("Support")),
            "token-allow".to_string(),
        ))
        .await
        .unwrap();
    store
        .upsert(WebchatSession::new(
            "conv-blocked".to_string(),
            tenant_ctx("dev", "acme", Some("support")),
            "token-blocked".to_string(),
        ))
        .await
        .unwrap();
    store.set_proactive("conv-blocked", false).await.unwrap();
    store
        .upsert(WebchatSession::new(
            "conv-other".to_string(),
            tenant_ctx("dev", "acme", Some("other")),
            "token-other".to_string(),
        ))
        .await
        .unwrap();
    let conversations = memory_store();
    conversations
        .create("conv-allow", tenant_ctx("dev", "acme", Some("support")))
        .await
        .unwrap();

    let poster = Arc::new(RecordingPoster::default());
    let scope = tenant_scope("dev", "acme", None);
    let provider = provider_with_secrets(
        Config::with_base_url("https://directline.test/v3/directline"),
        signing_scope(),
        &[(&scope, "webchat", "channel_token", "dl-secret")],
    );
    let state = AppState::new(provider, direct_line, client.clone())
        .with_sessions(sessions)
        .with_activity_poster(poster.clone() as SharedDirectLinePoster)
        .with_oauth_client(Arc::new(StubOauthClient))
        .with_conversations(conversations.clone());

    let Json(payload) = admin_post_activity(
        State(state),
        Path(AdminPath {
            env: "dev".to_string(),
            tenant: "acme".to_string(),
        }),
        Json(AdminPostActivityRequest {
            team: Some("support".to_string()),
            conversation_id: None,
            activity: json!({
                "type": "event",
                "name": "proactive.ping"
            }),
        }),
    )
    .await
    .unwrap();

    assert_eq!(payload.posted, 1);
    assert_eq!(payload.skipped, 1);

    let calls = poster.calls.lock().await;
    assert!(calls.is_empty());

    let page = conversations
        .activities("conv-allow", None)
        .await
        .expect("allow conversation activities");
    assert_eq!(page.activities.len(), 1);
    let stored = &page.activities[0].activity;
    assert_eq!(stored.from.as_ref().unwrap().id, "bot");
    assert_eq!(stored.r#type, "event");
    assert_eq!(stored.text.as_deref(), None);
}

#[tokio::test]
async fn admin_errors_for_unknown_conversation() {
    let direct_line = Arc::new(MockDirectLineApi::default());
    let client = Client::builder().build().unwrap();
    let store = Arc::new(MemorySessionStore::default());
    let sessions: SharedSessionStore = store.clone();
    let conversations = memory_store();

    let poster = Arc::new(RecordingPoster::default());
    let scope = tenant_scope("dev", "acme", None);
    let provider = provider_with_secrets(
        Config::with_base_url("https://directline.test/v3/directline"),
        signing_scope(),
        &[(&scope, "webchat", "channel_token", "dl-secret")],
    );
    let state = AppState::new(provider, direct_line, client.clone())
        .with_sessions(sessions)
        .with_activity_poster(poster.clone() as SharedDirectLinePoster)
        .with_oauth_client(Arc::new(StubOauthClient))
        .with_conversations(conversations.clone());

    let result = admin_post_activity(
        State(state),
        Path(AdminPath {
            env: "dev".to_string(),
            tenant: "acme".to_string(),
        }),
        Json(AdminPostActivityRequest {
            team: None,
            conversation_id: Some("unknown".to_string()),
            activity: json!({
                "type": "message",
                "text": "hello proactive"
            }),
        }),
    )
    .await;

    match result {
        Ok(_) => panic!("expected not found error"),
        Err(WebChatError::NotFound(message)) => {
            assert_eq!(message, "conversation not found");
        }
        Err(other) => panic!("unexpected error: {other:?}"),
    }
}

#[derive(Default)]
struct RecordingPoster {
    calls: Mutex<Vec<(String, Value)>>,
}

#[async_trait::async_trait]
impl DirectLinePoster for RecordingPoster {
    async fn post_activity(
        &self,
        _base_url: &str,
        conversation_id: &str,
        _bearer_token: &str,
        activity: Value,
    ) -> Result<(), DirectLineError> {
        self.calls
            .lock()
            .await
            .push((conversation_id.to_string(), activity));
        Ok(())
    }
}

struct StubOauthClient;

#[async_trait::async_trait]
impl GreenticOauthClient for StubOauthClient {
    async fn exchange_code(
        &self,
        _tenant_ctx: &TenantCtx,
        _config: &OAuthProviderConfig,
        _code: &str,
        _redirect_uri: &str,
    ) -> Result<String, anyhow::Error> {
        Ok("oauth-token-handle".to_string())
    }
}