Skip to main content

gsm_core/platforms/webchat/
http.rs

1use std::{collections::HashMap, future::Future, sync::Arc};
2
3use axum::http::StatusCode;
4use axum::{
5    Json, Router,
6    extract::{Path, State},
7    routing::{get, post},
8};
9use metrics::counter;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tracing::warn;
13
14use super::conversation::{
15    Activity, ChannelAccount, ConversationAccount, SharedConversationStore, StoreError, noop_store,
16};
17use super::{
18    WebChatProvider, backoff,
19    bus::{NoopBus, SharedBus},
20    circuit::{CircuitBreaker, CircuitLabels, CircuitSettings},
21    directline_client::{ConversationResponse, DirectLineApi, DirectLineError, TokenResponse},
22    error::WebChatError,
23    ingress::{
24        IngressCtx, IngressDeps, ReqwestActivitiesTransport, SharedActivitiesTransport,
25        run_poll_loop,
26    },
27    oauth::{GreenticOauthClient, ReqwestGreenticOauthClient},
28    provider::RouteContext,
29    session::{MemorySessionStore, SharedSessionStore, WebchatSession},
30    telemetry,
31};
32use async_trait::async_trait;
33use greentic_types::{EnvId, TeamId, TenantCtx, TenantId};
34use reqwest::Client;
35use tokio::{spawn, sync::Mutex as AsyncMutex};
36
37pub struct AppState {
38    pub provider: WebChatProvider,
39    pub direct_line: Arc<dyn DirectLineApi>,
40    pub http_client: Client,
41    pub transport: SharedActivitiesTransport,
42    pub bus: SharedBus,
43    pub sessions: SharedSessionStore,
44    pub activity_poster: SharedDirectLinePoster,
45    pub oauth_client: Arc<dyn GreenticOauthClient>,
46    pub conversations: SharedConversationStore,
47    circuit_settings: CircuitSettings,
48    token_circuits: CircuitRegistry,
49    conversation_circuits: CircuitRegistry,
50}
51
52impl Clone for AppState {
53    fn clone(&self) -> Self {
54        Self {
55            provider: self.provider.clone(),
56            direct_line: Arc::clone(&self.direct_line),
57            http_client: self.http_client.clone(),
58            transport: Arc::clone(&self.transport),
59            bus: Arc::clone(&self.bus),
60            sessions: Arc::clone(&self.sessions),
61            activity_poster: Arc::clone(&self.activity_poster),
62            oauth_client: Arc::clone(&self.oauth_client),
63            conversations: Arc::clone(&self.conversations),
64            circuit_settings: self.circuit_settings.clone(),
65            token_circuits: self.token_circuits.clone(),
66            conversation_circuits: self.conversation_circuits.clone(),
67        }
68    }
69}
70
71impl AppState {
72    pub fn new(
73        provider: WebChatProvider,
74        direct_line: Arc<dyn DirectLineApi>,
75        http_client: Client,
76    ) -> Self {
77        let transport_client = http_client.clone();
78        let poster_client = http_client.clone();
79        let oauth_client_http = http_client.clone();
80        let transport: SharedActivitiesTransport =
81            Arc::new(ReqwestActivitiesTransport::new(transport_client));
82        let activity_poster: SharedDirectLinePoster =
83            Arc::new(HttpDirectLinePoster::new(poster_client));
84        let oauth_client: Arc<dyn GreenticOauthClient> =
85            Arc::new(ReqwestGreenticOauthClient::new(oauth_client_http));
86        let circuit_settings = CircuitSettings::default();
87        Self {
88            provider,
89            direct_line,
90            http_client,
91            transport,
92            bus: Arc::new(NoopBus),
93            sessions: Arc::new(MemorySessionStore::default()),
94            activity_poster,
95            oauth_client,
96            conversations: noop_store(),
97            circuit_settings: circuit_settings.clone(),
98            token_circuits: CircuitRegistry::new(circuit_settings.clone()),
99            conversation_circuits: CircuitRegistry::new(circuit_settings),
100        }
101    }
102
103    pub fn with_bus(mut self, bus: SharedBus) -> Self {
104        self.bus = bus;
105        self
106    }
107
108    pub fn with_sessions(mut self, sessions: SharedSessionStore) -> Self {
109        self.sessions = sessions;
110        self
111    }
112
113    pub fn with_transport(mut self, transport: SharedActivitiesTransport) -> Self {
114        self.transport = transport;
115        self
116    }
117
118    pub fn with_activity_poster(mut self, poster: SharedDirectLinePoster) -> Self {
119        self.activity_poster = poster;
120        self
121    }
122
123    pub fn with_oauth_client(mut self, client: Arc<dyn GreenticOauthClient>) -> Self {
124        self.oauth_client = client;
125        self
126    }
127
128    pub fn with_conversations(mut self, conversations: SharedConversationStore) -> Self {
129        self.conversations = conversations;
130        self
131    }
132
133    pub async fn post_activity(
134        &self,
135        conversation_id: &str,
136        bearer_token: &str,
137        activity: Value,
138    ) -> Result<(), WebChatError> {
139        validate_activity_for_post(&activity)?;
140        self.activity_poster
141            .post_activity(
142                self.provider.config().direct_line_base(),
143                conversation_id,
144                bearer_token,
145                activity,
146            )
147            .await
148            .map_err(WebChatError::from)
149    }
150}
151
152pub fn router(state: AppState) -> Router<AppState> {
153    Router::new()
154        .route("/webchat/healthz", get(healthz))
155        .route(
156            "/webchat/{env}/{tenant}/tokens/generate",
157            post(generate_token),
158        )
159        .route(
160            "/webchat/{env}/{tenant}/{team}/tokens/generate",
161            post(generate_token),
162        )
163        .route(
164            "/webchat/{env}/{tenant}/conversations/start",
165            post(start_conversation),
166        )
167        .route(
168            "/webchat/{env}/{tenant}/{team}/conversations/start",
169            post(start_conversation),
170        )
171        .route(
172            "/webchat/admin/{env}/{tenant}/post-activity",
173            post(admin_post_activity),
174        )
175        .route("/webchat/oauth/start", get(super::oauth::start))
176        .route("/webchat/oauth/callback", get(super::oauth::callback))
177        .with_state(state)
178}
179
180async fn healthz() -> StatusCode {
181    StatusCode::NO_CONTENT
182}
183
184#[derive(Deserialize)]
185struct TokenPath {
186    env: String,
187    tenant: String,
188    #[serde(default)]
189    team: Option<String>,
190}
191
192#[derive(Default, Deserialize)]
193struct GenerateTokenRequestBody {}
194
195#[derive(Serialize)]
196struct GenerateTokenResponse {
197    token: String,
198    expires_in: u64,
199}
200
201#[derive(Deserialize)]
202struct StartConversationRequest {
203    token: String,
204}
205
206#[derive(Serialize)]
207struct StartConversationResponse {
208    token: String,
209    #[serde(rename = "conversationId")]
210    conversation_id: String,
211    #[serde(rename = "streamUrl", skip_serializing_if = "Option::is_none")]
212    stream_url: Option<String>,
213    expires_in: u64,
214}
215
216async fn generate_token(
217    State(state): State<AppState>,
218    Path(path): Path<TokenPath>,
219    Json(_): Json<GenerateTokenRequestBody>,
220) -> Result<Json<GenerateTokenResponse>, WebChatError> {
221    let ctx = RouteContext::new(path.env, path.tenant, path.team);
222    let span = telemetry::span_for("tokens.generate", &ctx);
223    let _guard = span.enter();
224    let team_label = telemetry::team_or_dash(ctx.team());
225    let env_metric = ctx.env().to_string();
226    let tenant_metric = ctx.tenant().to_string();
227    let team_metric = team_label.to_string();
228    let tenant_ctx = build_tenant_ctx(&ctx);
229
230    let secret = state
231        .provider
232        .direct_line_secret(&tenant_ctx)
233        .await
234        .map_err(|err| {
235            counter!(
236                "webchat_errors_total",
237                "kind" => "secret_backend_error",
238                "env" => env_metric.clone(),
239                "tenant" => tenant_metric.clone(),
240                "team" => team_metric.clone()
241            )
242            .increment(1);
243            WebChatError::Internal(err)
244        })?
245        .ok_or_else(|| {
246            counter!(
247                "webchat_errors_total",
248                "kind" => "missing_secret",
249                "env" => env_metric.clone(),
250                "tenant" => tenant_metric.clone(),
251                "team" => team_metric.clone()
252            )
253            .increment(1);
254            WebChatError::MissingSecret
255        })?;
256
257    let secret = Arc::new(secret);
258    let response = call_with_circuit(
259        &state.token_circuits,
260        direct_line_circuit_key("tokens", &ctx),
261        direct_line_labels(&ctx),
262        {
263            let direct_line = Arc::clone(&state.direct_line);
264            let secret = Arc::clone(&secret);
265            move || {
266                let direct_line = Arc::clone(&direct_line);
267                let secret = Arc::clone(&secret);
268                async move { direct_line.generate_token(secret.as_ref()).await }
269            }
270        },
271    )
272    .await
273    .map_err(|err| map_directline_error("tokens.generate", err))?;
274
275    counter!(
276        "webchat_tokens_generated_total",
277        "env" => env_metric.clone(),
278        "tenant" => tenant_metric.clone(),
279        "team" => team_metric.clone()
280    )
281    .increment(1);
282
283    Ok(Json(token_response_body(response)))
284}
285
286async fn start_conversation(
287    State(state): State<AppState>,
288    Path(path): Path<TokenPath>,
289    Json(body): Json<StartConversationRequest>,
290) -> Result<Json<StartConversationResponse>, WebChatError> {
291    let ctx = RouteContext::new(path.env, path.tenant, path.team);
292    let span = telemetry::span_for("conversations.start", &ctx);
293    let _guard = span.enter();
294    let env_metric = ctx.env().to_string();
295    let tenant_metric = ctx.tenant().to_string();
296    let team_metric = telemetry::team_or_dash(ctx.team()).to_string();
297
298    let trimmed = body.token.trim();
299    if trimmed.is_empty() {
300        return Err(WebChatError::BadRequest("token is required"));
301    }
302
303    let token_for_retry = Arc::new(trimmed.to_string());
304    let conversation_response = call_with_circuit(
305        &state.conversation_circuits,
306        direct_line_circuit_key("conversations", &ctx),
307        direct_line_labels(&ctx),
308        {
309            let direct_line = Arc::clone(&state.direct_line);
310            let token = Arc::clone(&token_for_retry);
311            move || {
312                let direct_line = Arc::clone(&direct_line);
313                let token = Arc::clone(&token);
314                async move { direct_line.start_conversation(token.as_ref()).await }
315            }
316        },
317    )
318    .await
319    .map_err(|err| map_directline_error("conversations.start", err))?;
320
321    let tenant_ctx = build_tenant_ctx(&ctx);
322    state
323        .sessions
324        .upsert(WebchatSession::new(
325            conversation_response.conversation_id.clone(),
326            tenant_ctx.clone(),
327            conversation_response.token.clone(),
328        ))
329        .await
330        .map_err(WebChatError::Internal)?;
331
332    counter!(
333        "webchat_conversations_started_total",
334        "env" => env_metric.clone(),
335        "tenant" => tenant_metric.clone(),
336        "team" => team_metric.clone()
337    )
338    .increment(1);
339    let ingress_deps = IngressDeps {
340        bus: Arc::clone(&state.bus),
341        sessions: Arc::clone(&state.sessions),
342        dl_base: state.provider.config().direct_line_base().to_string(),
343        transport: Arc::clone(&state.transport),
344        circuit: state.circuit_settings.clone(),
345    };
346    let ingress_ctx = IngressCtx {
347        tenant_ctx,
348        conversation_id: conversation_response.conversation_id.clone(),
349        token: conversation_response.token.clone(),
350    };
351
352    spawn(async move {
353        if let Err(err) = run_poll_loop(ingress_deps, ingress_ctx).await {
354            warn!(error = %err, "webchat poll loop terminated");
355        }
356    });
357
358    Ok(Json(conversation_response_body(conversation_response)))
359}
360
361fn token_response_body(response: TokenResponse) -> GenerateTokenResponse {
362    GenerateTokenResponse {
363        token: response.token,
364        expires_in: response.expires_in.unwrap_or(DEFAULT_EXPIRY_SECONDS),
365    }
366}
367
368fn conversation_response_body(response: ConversationResponse) -> StartConversationResponse {
369    StartConversationResponse {
370        token: response.token,
371        conversation_id: response.conversation_id,
372        stream_url: response.stream_url,
373        expires_in: response.expires_in.unwrap_or(DEFAULT_EXPIRY_SECONDS),
374    }
375}
376
377const DEFAULT_EXPIRY_SECONDS: u64 = 1800;
378
379fn build_tenant_ctx(ctx: &RouteContext) -> TenantCtx {
380    let mut tenant_ctx = TenantCtx::new(
381        EnvId(ctx.env().to_string()),
382        TenantId(ctx.tenant().to_string()),
383    );
384    if let Some(team) = ctx.team() {
385        let team_id = TeamId(team.to_string());
386        tenant_ctx = tenant_ctx.with_team(Some(team_id));
387    }
388    tenant_ctx
389}
390
391fn map_directline_error(action: &str, err: DirectLineError) -> WebChatError {
392    log_directline_error(action, &err);
393    WebChatError::from(err)
394}
395
396const MAX_DIRECT_LINE_RETRIES: u32 = 5;
397const NO_CONVERSATION_LABEL: &str = "-";
398
399#[derive(Clone)]
400struct CircuitRegistry {
401    settings: CircuitSettings,
402    inner: Arc<AsyncMutex<HashMap<String, Arc<AsyncMutex<CircuitBreaker>>>>>,
403}
404
405impl CircuitRegistry {
406    fn new(settings: CircuitSettings) -> Self {
407        Self {
408            settings,
409            inner: Arc::new(AsyncMutex::new(HashMap::new())),
410        }
411    }
412
413    async fn circuit(&self, key: &str, labels: CircuitLabels) -> Arc<AsyncMutex<CircuitBreaker>> {
414        let mut guard = self.inner.lock().await;
415        if let Some(existing) = guard.get(key) {
416            existing.clone()
417        } else {
418            let circuit = Arc::new(AsyncMutex::new(CircuitBreaker::new(
419                self.settings.clone(),
420                labels,
421            )));
422            guard.insert(key.to_string(), circuit.clone());
423            circuit
424        }
425    }
426}
427
428fn direct_line_circuit_key(scope: &str, ctx: &RouteContext) -> String {
429    let team = telemetry::team_or_dash(ctx.team());
430    let team_slug = team.to_ascii_lowercase();
431    format!(
432        "{scope}:{env}:{tenant}:{team}",
433        scope = scope,
434        env = ctx.env().to_ascii_lowercase(),
435        tenant = ctx.tenant().to_ascii_lowercase(),
436        team = team_slug
437    )
438}
439
440fn direct_line_labels(ctx: &RouteContext) -> CircuitLabels {
441    CircuitLabels::new(
442        ctx.env().to_string(),
443        ctx.tenant().to_string(),
444        telemetry::team_or_dash(ctx.team()).to_string(),
445        NO_CONVERSATION_LABEL.to_string(),
446    )
447}
448
449fn is_retryable(err: &DirectLineError) -> bool {
450    matches!(err, DirectLineError::Transport(_))
451        || matches!(err, DirectLineError::Remote { status, .. }
452            if *status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error())
453}
454
455async fn call_with_circuit<F, Fut, T>(
456    registry: &CircuitRegistry,
457    key: String,
458    labels: CircuitLabels,
459    mut operation: F,
460) -> Result<T, DirectLineError>
461where
462    F: FnMut() -> Fut,
463    Fut: Future<Output = Result<T, DirectLineError>>,
464{
465    let circuit = registry.circuit(&key, labels).await;
466    let mut attempt: u32 = 0;
467    loop {
468        {
469            let mut guard = circuit.lock().await;
470            guard.before_request().await;
471        }
472
473        match operation().await {
474            Ok(value) => {
475                let mut guard = circuit.lock().await;
476                guard.on_success();
477                return Ok(value);
478            }
479            Err(err) => {
480                let retryable = is_retryable(&err);
481                {
482                    let mut guard = circuit.lock().await;
483                    guard.on_failure();
484                }
485
486                if retryable && attempt < MAX_DIRECT_LINE_RETRIES {
487                    attempt = attempt.saturating_add(1);
488                    if let DirectLineError::Remote {
489                        retry_after: Some(delay),
490                        ..
491                    } = &err
492                    {
493                        tokio::time::sleep(*delay).await;
494                    } else {
495                        backoff::sleep(attempt).await;
496                    }
497                    continue;
498                }
499
500                return Err(err);
501            }
502        }
503    }
504}
505
506fn log_directline_error(action: &str, err: &DirectLineError) {
507    match err {
508        DirectLineError::Remote {
509            status,
510            retry_after,
511            ..
512        } => {
513            let status_label = status.as_str().to_string();
514            let endpoint_label = action.to_string();
515            warn!(%action, %status, retry_after = retry_after.map(|d| d.as_secs()), "direct line remote error");
516            counter!(
517                "webchat_errors_total",
518                "kind" => "directline_remote",
519                "endpoint" => endpoint_label,
520                "status" => status_label
521            )
522            .increment(1);
523        }
524        DirectLineError::Transport(source) => {
525            warn!(%action, error = %source, "direct line transport error");
526            counter!(
527                "webchat_errors_total",
528                "kind" => "directline_transport",
529                "endpoint" => action.to_string()
530            )
531            .increment(1);
532        }
533        DirectLineError::Decode(_) => {
534            warn!(%action, "direct line decode error");
535            counter!(
536                "webchat_errors_total",
537                "kind" => "directline_decode",
538                "endpoint" => action.to_string()
539            )
540            .increment(1);
541        }
542        DirectLineError::Config(_) => {
543            warn!(%action, "direct line configuration error");
544            counter!(
545                "webchat_errors_total",
546                "kind" => "directline_config",
547                "endpoint" => action.to_string()
548            )
549            .increment(1);
550        }
551    }
552}
553
554const ALLOWED_ATTACHMENT_TYPES: &[&str] = &[
555    "application/vnd.microsoft.card.adaptive",
556    "application/json",
557    "image/png",
558    "image/jpeg",
559    "image/gif",
560];
561
562const MAX_ATTACHMENT_BYTES: usize = 512 * 1024;
563
564fn validate_activity_for_post(activity: &Value) -> Result<(), WebChatError> {
565    if let Some(attachments) = activity.get("attachments").and_then(Value::as_array) {
566        for attachment in attachments {
567            let content_type = attachment
568                .get("contentType")
569                .or_else(|| attachment.get("content_type"))
570                .and_then(Value::as_str)
571                .ok_or(WebChatError::BadRequest("attachment missing contentType"))?;
572
573            let allowed = ALLOWED_ATTACHMENT_TYPES
574                .iter()
575                .any(|allowed| content_type.eq_ignore_ascii_case(allowed));
576            if !allowed {
577                return Err(WebChatError::BadRequest(
578                    "attachment content type not allowed",
579                ));
580            }
581
582            if let Some(content) = attachment.get("content") {
583                let serialized = serde_json::to_vec(content).map_err(|_| {
584                    WebChatError::BadRequest("attachment content is not valid JSON")
585                })?;
586                if serialized.len() > MAX_ATTACHMENT_BYTES {
587                    return Err(WebChatError::BadRequest(
588                        "attachment content exceeds size limit",
589                    ));
590                }
591            }
592        }
593    }
594
595    Ok(())
596}
597
598#[derive(Deserialize)]
599pub struct AdminPath {
600    pub env: String,
601    pub tenant: String,
602}
603
604#[derive(Deserialize)]
605pub struct AdminPostActivityRequest {
606    #[serde(default)]
607    pub team: Option<String>,
608    #[serde(rename = "conversation_id", default)]
609    pub conversation_id: Option<String>,
610    pub activity: Value,
611}
612
613#[derive(Serialize)]
614pub struct AdminPostActivityResponse {
615    pub posted: usize,
616    pub skipped: usize,
617}
618
619pub async fn admin_post_activity(
620    State(state): State<AppState>,
621    Path(path): Path<AdminPath>,
622    Json(body): Json<AdminPostActivityRequest>,
623) -> Result<Json<AdminPostActivityResponse>, WebChatError> {
624    let AdminPostActivityRequest {
625        team,
626        conversation_id,
627        activity,
628    } = body;
629
630    if !activity.is_object() {
631        return Err(WebChatError::BadRequest("activity must be an object"));
632    }
633
634    let team_filter = team.as_deref();
635
636    if let Some(conversation_id) = conversation_id.as_deref() {
637        let session = state
638            .sessions
639            .get(conversation_id)
640            .await
641            .map_err(WebChatError::Internal)?
642            .ok_or(WebChatError::NotFound("conversation not found"))?;
643
644        if !session
645            .tenant_ctx
646            .env
647            .as_ref()
648            .eq_ignore_ascii_case(&path.env)
649            || !session
650                .tenant_ctx
651                .tenant
652                .as_ref()
653                .eq_ignore_ascii_case(&path.tenant)
654        {
655            return Err(WebChatError::NotFound("conversation not found"));
656        }
657
658        if let Some(team) = team_filter
659            && !session
660                .tenant_ctx
661                .team
662                .as_ref()
663                .map(|value| value.as_ref().eq_ignore_ascii_case(team))
664                .unwrap_or(false)
665        {
666            return Err(WebChatError::NotFound("conversation not found"));
667        }
668
669        return append_and_broadcast(&state, &session, activity)
670            .await
671            .map(|posted| Json(AdminPostActivityResponse { posted, skipped: 0 }));
672    }
673
674    let sessions = state
675        .sessions
676        .list_by_tenant(&path.env, &path.tenant, team_filter)
677        .await
678        .map_err(WebChatError::Internal)?;
679
680    if sessions.is_empty() {
681        return Err(WebChatError::NotFound("no matching sessions"));
682    }
683
684    let mut posted = 0usize;
685    let mut skipped = 0usize;
686    for session in sessions {
687        match append_and_broadcast(&state, &session, activity.clone()).await {
688            Ok(count) => posted += count,
689            Err(WebChatError::BadRequest(_)) => {
690                skipped += 1;
691                continue;
692            }
693            Err(other) => return Err(other),
694        }
695    }
696
697    Ok(Json(AdminPostActivityResponse { posted, skipped }))
698}
699
700async fn append_and_broadcast(
701    state: &AppState,
702    session: &WebchatSession,
703    activity_json: Value,
704) -> Result<usize, WebChatError> {
705    if !session.proactive_ok {
706        return Err(WebChatError::BadRequest("proactive messaging disabled"));
707    }
708    let mut activity: Activity = serde_json::from_value(activity_json)
709        .map_err(|_| WebChatError::BadRequest("activity must match Bot Framework schema"))?;
710    apply_bot_defaults(&mut activity, &session.conversation_id);
711
712    let stored = match state
713        .conversations
714        .append(&session.conversation_id, activity.clone())
715        .await
716    {
717        Ok(stored) => stored,
718        Err(StoreError::NotFound(_)) => {
719            state
720                .conversations
721                .create(&session.conversation_id, session.tenant_ctx.clone())
722                .await
723                .map_err(|err| WebChatError::Internal(err.into()))?;
724            state
725                .conversations
726                .append(&session.conversation_id, activity)
727                .await
728                .map_err(|err| WebChatError::Internal(err.into()))?
729        }
730        Err(StoreError::QuotaExceeded(_)) => {
731            return Err(WebChatError::BadRequest(
732                "conversation backlog quota exceeded",
733            ));
734        }
735        Err(err) => return Err(WebChatError::Internal(err.into())),
736    };
737
738    if let Err(err) = state
739        .sessions
740        .update_watermark(
741            &session.conversation_id,
742            Some((stored.watermark + 1).to_string()),
743        )
744        .await
745    {
746        warn!(error = %err, "failed to update watermark");
747    }
748
749    Ok(1)
750}
751
752fn apply_bot_defaults(activity: &mut Activity, conversation_id: &str) {
753    match activity.from.as_mut() {
754        Some(from) => {
755            if from.id.trim().is_empty() {
756                from.id = "bot".into();
757            }
758            from.role = Some("bot".into());
759        }
760        None => {
761            activity.from = Some(ChannelAccount {
762                id: "bot".into(),
763                name: None,
764                role: Some("bot".into()),
765            });
766        }
767    }
768    if activity
769        .conversation
770        .as_ref()
771        .map(|conv| conv.id.trim().is_empty())
772        .unwrap_or(true)
773    {
774        activity.conversation = Some(ConversationAccount {
775            id: conversation_id.to_string(),
776        });
777    }
778}
779
780pub type SharedDirectLinePoster = Arc<dyn DirectLinePoster>;
781
782#[async_trait]
783pub trait DirectLinePoster: Send + Sync {
784    async fn post_activity(
785        &self,
786        base_url: &str,
787        conversation_id: &str,
788        bearer_token: &str,
789        activity: Value,
790    ) -> Result<(), DirectLineError>;
791}
792
793pub struct HttpDirectLinePoster {
794    client: Client,
795}
796
797impl HttpDirectLinePoster {
798    pub fn new(client: Client) -> Self {
799        Self { client }
800    }
801}
802
803#[async_trait]
804impl DirectLinePoster for HttpDirectLinePoster {
805    async fn post_activity(
806        &self,
807        base_url: &str,
808        conversation_id: &str,
809        bearer_token: &str,
810        activity: Value,
811    ) -> Result<(), DirectLineError> {
812        let url = format!(
813            "{}/conversations/{}/activities",
814            base_url.trim_end_matches('/'),
815            conversation_id
816        );
817
818        let response = self
819            .client
820            .post(url)
821            .bearer_auth(bearer_token)
822            .json(&activity)
823            .send()
824            .await
825            .map_err(DirectLineError::Transport)?;
826
827        let status = response.status();
828        if status.is_success() {
829            return Ok(());
830        }
831
832        let retry_after = response
833            .headers()
834            .get(axum::http::header::RETRY_AFTER)
835            .and_then(|value| value.to_str().ok())
836            .and_then(|value| value.parse::<u64>().ok())
837            .map(std::time::Duration::from_secs);
838        let body = response
839            .text()
840            .await
841            .unwrap_or_else(|_| "<unreadable>".to_string());
842        let message = if body.len() > 512 {
843            body[..512].to_string()
844        } else {
845            body
846        };
847
848        Err(DirectLineError::Remote {
849            status,
850            retry_after,
851            message,
852        })
853    }
854}
855
856#[cfg(test)]
857mod tests {
858    use super::*;
859
860    #[tokio::test]
861    async fn healthz_returns_no_content() {
862        assert_eq!(healthz().await, StatusCode::NO_CONTENT);
863    }
864}