gsm_core/platforms/webchat/
http.rs

1use std::{collections::HashMap, future::Future, sync::Arc};
2
3use axum::http::StatusCode;
4use axum::{
5    Json, Router,
6    extract::{Path, State},
7    routing::{get, post},
8};
9use metrics::counter;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tracing::warn;
13
14use super::conversation::SharedConversationStore;
15#[cfg(feature = "directline_standalone")]
16use super::conversation::{Activity, ChannelAccount, ConversationAccount, StoreError, noop_store};
17use super::{
18    WebChatProvider, backoff,
19    bus::{NoopBus, SharedBus},
20    circuit::{CircuitBreaker, CircuitLabels, CircuitSettings},
21    directline_client::{ConversationResponse, DirectLineApi, DirectLineError, TokenResponse},
22    error::WebChatError,
23    ingress::{
24        IngressCtx, IngressDeps, ReqwestActivitiesTransport, SharedActivitiesTransport,
25        run_poll_loop,
26    },
27    oauth::{GreenticOauthClient, ReqwestGreenticOauthClient},
28    provider::RouteContext,
29    session::{MemorySessionStore, SharedSessionStore, WebchatSession},
30    telemetry,
31};
32use async_trait::async_trait;
33use greentic_types::{EnvId, TeamId, TenantCtx, TenantId};
34use reqwest::Client;
35use tokio::{spawn, sync::Mutex as AsyncMutex};
36
37pub struct AppState {
38    pub provider: WebChatProvider,
39    pub direct_line: Arc<dyn DirectLineApi>,
40    pub http_client: Client,
41    pub transport: SharedActivitiesTransport,
42    pub bus: SharedBus,
43    pub sessions: SharedSessionStore,
44    pub activity_poster: SharedDirectLinePoster,
45    pub oauth_client: Arc<dyn GreenticOauthClient>,
46    #[cfg(feature = "directline_standalone")]
47    pub conversations: SharedConversationStore,
48    circuit_settings: CircuitSettings,
49    token_circuits: CircuitRegistry,
50    conversation_circuits: CircuitRegistry,
51}
52
53impl Clone for AppState {
54    fn clone(&self) -> Self {
55        Self {
56            provider: self.provider.clone(),
57            direct_line: Arc::clone(&self.direct_line),
58            http_client: self.http_client.clone(),
59            transport: Arc::clone(&self.transport),
60            bus: Arc::clone(&self.bus),
61            sessions: Arc::clone(&self.sessions),
62            activity_poster: Arc::clone(&self.activity_poster),
63            oauth_client: Arc::clone(&self.oauth_client),
64            #[cfg(feature = "directline_standalone")]
65            conversations: Arc::clone(&self.conversations),
66            circuit_settings: self.circuit_settings.clone(),
67            token_circuits: self.token_circuits.clone(),
68            conversation_circuits: self.conversation_circuits.clone(),
69        }
70    }
71}
72
73impl AppState {
74    pub fn new(
75        provider: WebChatProvider,
76        direct_line: Arc<dyn DirectLineApi>,
77        http_client: Client,
78    ) -> Self {
79        let transport_client = http_client.clone();
80        let poster_client = http_client.clone();
81        let oauth_client_http = http_client.clone();
82        let transport: SharedActivitiesTransport =
83            Arc::new(ReqwestActivitiesTransport::new(transport_client));
84        let activity_poster: SharedDirectLinePoster =
85            Arc::new(HttpDirectLinePoster::new(poster_client));
86        let oauth_client: Arc<dyn GreenticOauthClient> =
87            Arc::new(ReqwestGreenticOauthClient::new(oauth_client_http));
88        let circuit_settings = CircuitSettings::default();
89        Self {
90            provider,
91            direct_line,
92            http_client,
93            transport,
94            bus: Arc::new(NoopBus),
95            sessions: Arc::new(MemorySessionStore::default()),
96            activity_poster,
97            oauth_client,
98            #[cfg(feature = "directline_standalone")]
99            conversations: noop_store(),
100            circuit_settings: circuit_settings.clone(),
101            token_circuits: CircuitRegistry::new(circuit_settings.clone()),
102            conversation_circuits: CircuitRegistry::new(circuit_settings),
103        }
104    }
105
106    pub fn with_bus(mut self, bus: SharedBus) -> Self {
107        self.bus = bus;
108        self
109    }
110
111    pub fn with_sessions(mut self, sessions: SharedSessionStore) -> Self {
112        self.sessions = sessions;
113        self
114    }
115
116    pub fn with_transport(mut self, transport: SharedActivitiesTransport) -> Self {
117        self.transport = transport;
118        self
119    }
120
121    pub fn with_activity_poster(mut self, poster: SharedDirectLinePoster) -> Self {
122        self.activity_poster = poster;
123        self
124    }
125
126    pub fn with_oauth_client(mut self, client: Arc<dyn GreenticOauthClient>) -> Self {
127        self.oauth_client = client;
128        self
129    }
130
131    #[cfg(feature = "directline_standalone")]
132    pub fn with_conversations(mut self, conversations: SharedConversationStore) -> Self {
133        self.conversations = conversations;
134        self
135    }
136
137    #[cfg(not(feature = "directline_standalone"))]
138    pub fn with_conversations(self, _conversations: SharedConversationStore) -> Self {
139        self
140    }
141
142    pub async fn post_activity(
143        &self,
144        conversation_id: &str,
145        bearer_token: &str,
146        activity: Value,
147    ) -> Result<(), WebChatError> {
148        validate_activity_for_post(&activity)?;
149        self.activity_poster
150            .post_activity(
151                self.provider.config().direct_line_base(),
152                conversation_id,
153                bearer_token,
154                activity,
155            )
156            .await
157            .map_err(WebChatError::from)
158    }
159}
160
161pub fn router(state: AppState) -> Router<AppState> {
162    Router::new()
163        .route("/webchat/healthz", get(healthz))
164        .route(
165            "/webchat/{env}/{tenant}/tokens/generate",
166            post(generate_token),
167        )
168        .route(
169            "/webchat/{env}/{tenant}/{team}/tokens/generate",
170            post(generate_token),
171        )
172        .route(
173            "/webchat/{env}/{tenant}/conversations/start",
174            post(start_conversation),
175        )
176        .route(
177            "/webchat/{env}/{tenant}/{team}/conversations/start",
178            post(start_conversation),
179        )
180        .route(
181            "/webchat/admin/{env}/{tenant}/post-activity",
182            post(admin_post_activity),
183        )
184        .route("/webchat/oauth/start", get(super::oauth::start))
185        .route("/webchat/oauth/callback", get(super::oauth::callback))
186        .with_state(state)
187}
188
189async fn healthz() -> StatusCode {
190    StatusCode::NO_CONTENT
191}
192
193#[derive(Deserialize)]
194struct TokenPath {
195    env: String,
196    tenant: String,
197    #[serde(default)]
198    team: Option<String>,
199}
200
201#[derive(Default, Deserialize)]
202struct GenerateTokenRequestBody {}
203
204#[derive(Serialize)]
205struct GenerateTokenResponse {
206    token: String,
207    expires_in: u64,
208}
209
210#[derive(Deserialize)]
211struct StartConversationRequest {
212    token: String,
213}
214
215#[derive(Serialize)]
216struct StartConversationResponse {
217    token: String,
218    #[serde(rename = "conversationId")]
219    conversation_id: String,
220    #[serde(rename = "streamUrl", skip_serializing_if = "Option::is_none")]
221    stream_url: Option<String>,
222    expires_in: u64,
223}
224
225async fn generate_token(
226    State(state): State<AppState>,
227    Path(path): Path<TokenPath>,
228    Json(_): Json<GenerateTokenRequestBody>,
229) -> Result<Json<GenerateTokenResponse>, WebChatError> {
230    let ctx = RouteContext::new(path.env, path.tenant, path.team);
231    let span = telemetry::span_for("tokens.generate", &ctx);
232    let _guard = span.enter();
233    let team_label = telemetry::team_or_dash(ctx.team());
234    let env_metric = ctx.env().to_string();
235    let tenant_metric = ctx.tenant().to_string();
236    let team_metric = team_label.to_string();
237    let tenant_ctx = build_tenant_ctx(&ctx);
238
239    let secret = state
240        .provider
241        .direct_line_secret(&tenant_ctx)
242        .await
243        .map_err(|err| {
244            counter!(
245                "webchat_errors_total",
246                "kind" => "secret_backend_error",
247                "env" => env_metric.clone(),
248                "tenant" => tenant_metric.clone(),
249                "team" => team_metric.clone()
250            )
251            .increment(1);
252            WebChatError::Internal(err)
253        })?
254        .ok_or_else(|| {
255            counter!(
256                "webchat_errors_total",
257                "kind" => "missing_secret",
258                "env" => env_metric.clone(),
259                "tenant" => tenant_metric.clone(),
260                "team" => team_metric.clone()
261            )
262            .increment(1);
263            WebChatError::MissingSecret
264        })?;
265
266    let secret = Arc::new(secret);
267    let response = call_with_circuit(
268        &state.token_circuits,
269        direct_line_circuit_key("tokens", &ctx),
270        direct_line_labels(&ctx),
271        {
272            let direct_line = Arc::clone(&state.direct_line);
273            let secret = Arc::clone(&secret);
274            move || {
275                let direct_line = Arc::clone(&direct_line);
276                let secret = Arc::clone(&secret);
277                async move { direct_line.generate_token(secret.as_ref()).await }
278            }
279        },
280    )
281    .await
282    .map_err(|err| map_directline_error("tokens.generate", err))?;
283
284    counter!(
285        "webchat_tokens_generated_total",
286        "env" => env_metric.clone(),
287        "tenant" => tenant_metric.clone(),
288        "team" => team_metric.clone()
289    )
290    .increment(1);
291
292    Ok(Json(token_response_body(response)))
293}
294
295async fn start_conversation(
296    State(state): State<AppState>,
297    Path(path): Path<TokenPath>,
298    Json(body): Json<StartConversationRequest>,
299) -> Result<Json<StartConversationResponse>, WebChatError> {
300    let ctx = RouteContext::new(path.env, path.tenant, path.team);
301    let span = telemetry::span_for("conversations.start", &ctx);
302    let _guard = span.enter();
303    let env_metric = ctx.env().to_string();
304    let tenant_metric = ctx.tenant().to_string();
305    let team_metric = telemetry::team_or_dash(ctx.team()).to_string();
306
307    let trimmed = body.token.trim();
308    if trimmed.is_empty() {
309        return Err(WebChatError::BadRequest("token is required"));
310    }
311
312    let token_for_retry = Arc::new(trimmed.to_string());
313    let conversation_response = call_with_circuit(
314        &state.conversation_circuits,
315        direct_line_circuit_key("conversations", &ctx),
316        direct_line_labels(&ctx),
317        {
318            let direct_line = Arc::clone(&state.direct_line);
319            let token = Arc::clone(&token_for_retry);
320            move || {
321                let direct_line = Arc::clone(&direct_line);
322                let token = Arc::clone(&token);
323                async move { direct_line.start_conversation(token.as_ref()).await }
324            }
325        },
326    )
327    .await
328    .map_err(|err| map_directline_error("conversations.start", err))?;
329
330    let tenant_ctx = build_tenant_ctx(&ctx);
331    state
332        .sessions
333        .upsert(WebchatSession::new(
334            conversation_response.conversation_id.clone(),
335            tenant_ctx.clone(),
336            conversation_response.token.clone(),
337        ))
338        .await
339        .map_err(WebChatError::Internal)?;
340
341    counter!(
342        "webchat_conversations_started_total",
343        "env" => env_metric.clone(),
344        "tenant" => tenant_metric.clone(),
345        "team" => team_metric.clone()
346    )
347    .increment(1);
348    let ingress_deps = IngressDeps {
349        bus: Arc::clone(&state.bus),
350        sessions: Arc::clone(&state.sessions),
351        dl_base: state.provider.config().direct_line_base().to_string(),
352        transport: Arc::clone(&state.transport),
353        circuit: state.circuit_settings.clone(),
354    };
355    let ingress_ctx = IngressCtx {
356        tenant_ctx,
357        conversation_id: conversation_response.conversation_id.clone(),
358        token: conversation_response.token.clone(),
359    };
360
361    spawn(async move {
362        if let Err(err) = run_poll_loop(ingress_deps, ingress_ctx).await {
363            warn!(error = %err, "webchat poll loop terminated");
364        }
365    });
366
367    Ok(Json(conversation_response_body(conversation_response)))
368}
369
370fn token_response_body(response: TokenResponse) -> GenerateTokenResponse {
371    GenerateTokenResponse {
372        token: response.token,
373        expires_in: response.expires_in.unwrap_or(DEFAULT_EXPIRY_SECONDS),
374    }
375}
376
377fn conversation_response_body(response: ConversationResponse) -> StartConversationResponse {
378    StartConversationResponse {
379        token: response.token,
380        conversation_id: response.conversation_id,
381        stream_url: response.stream_url,
382        expires_in: response.expires_in.unwrap_or(DEFAULT_EXPIRY_SECONDS),
383    }
384}
385
386const DEFAULT_EXPIRY_SECONDS: u64 = 1800;
387
388fn build_tenant_ctx(ctx: &RouteContext) -> TenantCtx {
389    let mut tenant_ctx = TenantCtx::new(
390        EnvId(ctx.env().to_string()),
391        TenantId(ctx.tenant().to_string()),
392    );
393    if let Some(team) = ctx.team() {
394        let team_id = TeamId(team.to_string());
395        tenant_ctx = tenant_ctx.with_team(Some(team_id));
396    }
397    tenant_ctx
398}
399
400fn map_directline_error(action: &str, err: DirectLineError) -> WebChatError {
401    log_directline_error(action, &err);
402    WebChatError::from(err)
403}
404
405const MAX_DIRECT_LINE_RETRIES: u32 = 5;
406const NO_CONVERSATION_LABEL: &str = "-";
407
408#[derive(Clone)]
409struct CircuitRegistry {
410    settings: CircuitSettings,
411    inner: Arc<AsyncMutex<HashMap<String, Arc<AsyncMutex<CircuitBreaker>>>>>,
412}
413
414impl CircuitRegistry {
415    fn new(settings: CircuitSettings) -> Self {
416        Self {
417            settings,
418            inner: Arc::new(AsyncMutex::new(HashMap::new())),
419        }
420    }
421
422    async fn circuit(&self, key: &str, labels: CircuitLabels) -> Arc<AsyncMutex<CircuitBreaker>> {
423        let mut guard = self.inner.lock().await;
424        if let Some(existing) = guard.get(key) {
425            existing.clone()
426        } else {
427            let circuit = Arc::new(AsyncMutex::new(CircuitBreaker::new(
428                self.settings.clone(),
429                labels,
430            )));
431            guard.insert(key.to_string(), circuit.clone());
432            circuit
433        }
434    }
435}
436
437fn direct_line_circuit_key(scope: &str, ctx: &RouteContext) -> String {
438    let team = telemetry::team_or_dash(ctx.team());
439    let team_slug = team.to_ascii_lowercase();
440    format!(
441        "{scope}:{env}:{tenant}:{team}",
442        scope = scope,
443        env = ctx.env().to_ascii_lowercase(),
444        tenant = ctx.tenant().to_ascii_lowercase(),
445        team = team_slug
446    )
447}
448
449fn direct_line_labels(ctx: &RouteContext) -> CircuitLabels {
450    CircuitLabels::new(
451        ctx.env().to_string(),
452        ctx.tenant().to_string(),
453        telemetry::team_or_dash(ctx.team()).to_string(),
454        NO_CONVERSATION_LABEL.to_string(),
455    )
456}
457
458fn is_retryable(err: &DirectLineError) -> bool {
459    matches!(err, DirectLineError::Transport(_))
460        || matches!(err, DirectLineError::Remote { status, .. }
461            if *status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error())
462}
463
464async fn call_with_circuit<F, Fut, T>(
465    registry: &CircuitRegistry,
466    key: String,
467    labels: CircuitLabels,
468    mut operation: F,
469) -> Result<T, DirectLineError>
470where
471    F: FnMut() -> Fut,
472    Fut: Future<Output = Result<T, DirectLineError>>,
473{
474    let circuit = registry.circuit(&key, labels).await;
475    let mut attempt: u32 = 0;
476    loop {
477        {
478            let mut guard = circuit.lock().await;
479            guard.before_request().await;
480        }
481
482        match operation().await {
483            Ok(value) => {
484                let mut guard = circuit.lock().await;
485                guard.on_success();
486                return Ok(value);
487            }
488            Err(err) => {
489                let retryable = is_retryable(&err);
490                {
491                    let mut guard = circuit.lock().await;
492                    guard.on_failure();
493                }
494
495                if retryable && attempt < MAX_DIRECT_LINE_RETRIES {
496                    attempt = attempt.saturating_add(1);
497                    if let DirectLineError::Remote {
498                        retry_after: Some(delay),
499                        ..
500                    } = &err
501                    {
502                        tokio::time::sleep(*delay).await;
503                    } else {
504                        backoff::sleep(attempt).await;
505                    }
506                    continue;
507                }
508
509                return Err(err);
510            }
511        }
512    }
513}
514
515fn log_directline_error(action: &str, err: &DirectLineError) {
516    match err {
517        DirectLineError::Remote {
518            status,
519            retry_after,
520            ..
521        } => {
522            let status_label = status.as_str().to_string();
523            let endpoint_label = action.to_string();
524            warn!(%action, %status, retry_after = retry_after.map(|d| d.as_secs()), "direct line remote error");
525            counter!(
526                "webchat_errors_total",
527                "kind" => "directline_remote",
528                "endpoint" => endpoint_label,
529                "status" => status_label
530            )
531            .increment(1);
532        }
533        DirectLineError::Transport(source) => {
534            warn!(%action, error = %source, "direct line transport error");
535            counter!(
536                "webchat_errors_total",
537                "kind" => "directline_transport",
538                "endpoint" => action.to_string()
539            )
540            .increment(1);
541        }
542        DirectLineError::Decode(_) => {
543            warn!(%action, "direct line decode error");
544            counter!(
545                "webchat_errors_total",
546                "kind" => "directline_decode",
547                "endpoint" => action.to_string()
548            )
549            .increment(1);
550        }
551        DirectLineError::Config(_) => {
552            warn!(%action, "direct line configuration error");
553            counter!(
554                "webchat_errors_total",
555                "kind" => "directline_config",
556                "endpoint" => action.to_string()
557            )
558            .increment(1);
559        }
560    }
561}
562
563const ALLOWED_ATTACHMENT_TYPES: &[&str] = &[
564    "application/vnd.microsoft.card.adaptive",
565    "application/json",
566    "image/png",
567    "image/jpeg",
568    "image/gif",
569];
570
571const MAX_ATTACHMENT_BYTES: usize = 512 * 1024;
572
573fn validate_activity_for_post(activity: &Value) -> Result<(), WebChatError> {
574    if let Some(attachments) = activity.get("attachments").and_then(Value::as_array) {
575        for attachment in attachments {
576            let content_type = attachment
577                .get("contentType")
578                .or_else(|| attachment.get("content_type"))
579                .and_then(Value::as_str)
580                .ok_or(WebChatError::BadRequest("attachment missing contentType"))?;
581
582            let allowed = ALLOWED_ATTACHMENT_TYPES
583                .iter()
584                .any(|allowed| content_type.eq_ignore_ascii_case(allowed));
585            if !allowed {
586                return Err(WebChatError::BadRequest(
587                    "attachment content type not allowed",
588                ));
589            }
590
591            if let Some(content) = attachment.get("content") {
592                let serialized = serde_json::to_vec(content).map_err(|_| {
593                    WebChatError::BadRequest("attachment content is not valid JSON")
594                })?;
595                if serialized.len() > MAX_ATTACHMENT_BYTES {
596                    return Err(WebChatError::BadRequest(
597                        "attachment content exceeds size limit",
598                    ));
599                }
600            }
601        }
602    }
603
604    Ok(())
605}
606
607#[derive(Deserialize)]
608pub struct AdminPath {
609    pub env: String,
610    pub tenant: String,
611}
612
613#[derive(Deserialize)]
614pub struct AdminPostActivityRequest {
615    #[serde(default)]
616    pub team: Option<String>,
617    #[serde(rename = "conversation_id", default)]
618    pub conversation_id: Option<String>,
619    pub activity: Value,
620}
621
622#[derive(Serialize)]
623pub struct AdminPostActivityResponse {
624    pub posted: usize,
625    pub skipped: usize,
626}
627
628pub async fn admin_post_activity(
629    State(state): State<AppState>,
630    Path(path): Path<AdminPath>,
631    Json(body): Json<AdminPostActivityRequest>,
632) -> Result<Json<AdminPostActivityResponse>, WebChatError> {
633    let AdminPostActivityRequest {
634        team,
635        conversation_id,
636        activity,
637    } = body;
638
639    if !activity.is_object() {
640        return Err(WebChatError::BadRequest("activity must be an object"));
641    }
642
643    let team_filter = team.as_deref();
644
645    if let Some(conversation_id) = conversation_id.as_deref() {
646        let session = state
647            .sessions
648            .get(conversation_id)
649            .await
650            .map_err(WebChatError::Internal)?
651            .ok_or(WebChatError::NotFound("conversation not found"))?;
652
653        if !session
654            .tenant_ctx
655            .env
656            .as_ref()
657            .eq_ignore_ascii_case(&path.env)
658            || !session
659                .tenant_ctx
660                .tenant
661                .as_ref()
662                .eq_ignore_ascii_case(&path.tenant)
663        {
664            return Err(WebChatError::NotFound("conversation not found"));
665        }
666
667        if let Some(team) = team_filter
668            && !session
669                .tenant_ctx
670                .team
671                .as_ref()
672                .map(|value| value.as_ref().eq_ignore_ascii_case(team))
673                .unwrap_or(false)
674        {
675            return Err(WebChatError::NotFound("conversation not found"));
676        }
677
678        return append_and_broadcast(&state, &session, activity)
679            .await
680            .map(|posted| Json(AdminPostActivityResponse { posted, skipped: 0 }));
681    }
682
683    let sessions = state
684        .sessions
685        .list_by_tenant(&path.env, &path.tenant, team_filter)
686        .await
687        .map_err(WebChatError::Internal)?;
688
689    if sessions.is_empty() {
690        return Err(WebChatError::NotFound("no matching sessions"));
691    }
692
693    let mut posted = 0usize;
694    let mut skipped = 0usize;
695    for session in sessions {
696        match append_and_broadcast(&state, &session, activity.clone()).await {
697            Ok(count) => posted += count,
698            Err(WebChatError::BadRequest(_)) => {
699                skipped += 1;
700                continue;
701            }
702            Err(other) => return Err(other),
703        }
704    }
705
706    Ok(Json(AdminPostActivityResponse { posted, skipped }))
707}
708
709#[cfg(feature = "directline_standalone")]
710async fn append_and_broadcast(
711    state: &AppState,
712    session: &WebchatSession,
713    activity_json: Value,
714) -> Result<usize, WebChatError> {
715    if !session.proactive_ok {
716        return Err(WebChatError::BadRequest("proactive messaging disabled"));
717    }
718    let mut activity: Activity = serde_json::from_value(activity_json)
719        .map_err(|_| WebChatError::BadRequest("activity must match Bot Framework schema"))?;
720    apply_bot_defaults(&mut activity, &session.conversation_id);
721
722    let stored = match state
723        .conversations
724        .append(&session.conversation_id, activity.clone())
725        .await
726    {
727        Ok(stored) => stored,
728        Err(StoreError::NotFound(_)) => {
729            state
730                .conversations
731                .create(&session.conversation_id, session.tenant_ctx.clone())
732                .await
733                .map_err(|err| WebChatError::Internal(err.into()))?;
734            state
735                .conversations
736                .append(&session.conversation_id, activity)
737                .await
738                .map_err(|err| WebChatError::Internal(err.into()))?
739        }
740        Err(StoreError::QuotaExceeded(_)) => {
741            return Err(WebChatError::BadRequest(
742                "conversation backlog quota exceeded",
743            ));
744        }
745        Err(err) => return Err(WebChatError::Internal(err.into())),
746    };
747
748    if let Err(err) = state
749        .sessions
750        .update_watermark(
751            &session.conversation_id,
752            Some((stored.watermark + 1).to_string()),
753        )
754        .await
755    {
756        warn!(error = %err, "failed to update watermark");
757    }
758
759    Ok(1)
760}
761
762#[cfg(feature = "directline_standalone")]
763fn apply_bot_defaults(activity: &mut Activity, conversation_id: &str) {
764    match activity.from.as_mut() {
765        Some(from) => {
766            if from.id.trim().is_empty() {
767                from.id = "bot".into();
768            }
769            from.role = Some("bot".into());
770        }
771        None => {
772            activity.from = Some(ChannelAccount {
773                id: "bot".into(),
774                name: None,
775                role: Some("bot".into()),
776            });
777        }
778    }
779    if activity
780        .conversation
781        .as_ref()
782        .map(|conv| conv.id.trim().is_empty())
783        .unwrap_or(true)
784    {
785        activity.conversation = Some(ConversationAccount {
786            id: conversation_id.to_string(),
787        });
788    }
789}
790
791#[cfg(not(feature = "directline_standalone"))]
792async fn append_and_broadcast(
793    state: &AppState,
794    session: &WebchatSession,
795    activity: Value,
796) -> Result<usize, WebChatError> {
797    if !session.proactive_ok {
798        return Err(WebChatError::BadRequest("proactive messaging disabled"));
799    }
800    state
801        .post_activity(&session.conversation_id, &session.bearer_token, activity)
802        .await?;
803    Ok(1)
804}
805
806pub type SharedDirectLinePoster = Arc<dyn DirectLinePoster>;
807
808#[async_trait]
809pub trait DirectLinePoster: Send + Sync {
810    async fn post_activity(
811        &self,
812        base_url: &str,
813        conversation_id: &str,
814        bearer_token: &str,
815        activity: Value,
816    ) -> Result<(), DirectLineError>;
817}
818
819pub struct HttpDirectLinePoster {
820    client: Client,
821}
822
823impl HttpDirectLinePoster {
824    pub fn new(client: Client) -> Self {
825        Self { client }
826    }
827}
828
829#[async_trait]
830impl DirectLinePoster for HttpDirectLinePoster {
831    async fn post_activity(
832        &self,
833        base_url: &str,
834        conversation_id: &str,
835        bearer_token: &str,
836        activity: Value,
837    ) -> Result<(), DirectLineError> {
838        let url = format!(
839            "{}/conversations/{}/activities",
840            base_url.trim_end_matches('/'),
841            conversation_id
842        );
843
844        let response = self
845            .client
846            .post(url)
847            .bearer_auth(bearer_token)
848            .json(&activity)
849            .send()
850            .await
851            .map_err(DirectLineError::Transport)?;
852
853        let status = response.status();
854        if status.is_success() {
855            return Ok(());
856        }
857
858        let retry_after = response
859            .headers()
860            .get(axum::http::header::RETRY_AFTER)
861            .and_then(|value| value.to_str().ok())
862            .and_then(|value| value.parse::<u64>().ok())
863            .map(std::time::Duration::from_secs);
864        let body = response
865            .text()
866            .await
867            .unwrap_or_else(|_| "<unreadable>".to_string());
868        let message = if body.len() > 512 {
869            body[..512].to_string()
870        } else {
871            body
872        };
873
874        Err(DirectLineError::Remote {
875            status,
876            retry_after,
877            message,
878        })
879    }
880}
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885
886    #[tokio::test]
887    async fn healthz_returns_no_content() {
888        assert_eq!(healthz().await, StatusCode::NO_CONTENT);
889    }
890}