1use std::{collections::HashMap, future::Future, sync::Arc};
2
3use axum::http::StatusCode;
4use axum::{
5 Json, Router,
6 extract::{Path, State},
7 routing::{get, post},
8};
9use metrics::counter;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tracing::warn;
13
14use super::conversation::SharedConversationStore;
15#[cfg(feature = "directline_standalone")]
16use super::conversation::{Activity, ChannelAccount, ConversationAccount, StoreError, noop_store};
17use super::{
18 WebChatProvider, backoff,
19 bus::{NoopBus, SharedBus},
20 circuit::{CircuitBreaker, CircuitLabels, CircuitSettings},
21 directline_client::{ConversationResponse, DirectLineApi, DirectLineError, TokenResponse},
22 error::WebChatError,
23 ingress::{
24 IngressCtx, IngressDeps, ReqwestActivitiesTransport, SharedActivitiesTransport,
25 run_poll_loop,
26 },
27 oauth::{GreenticOauthClient, ReqwestGreenticOauthClient},
28 provider::RouteContext,
29 session::{MemorySessionStore, SharedSessionStore, WebchatSession},
30 telemetry,
31};
32use async_trait::async_trait;
33use greentic_types::{EnvId, TeamId, TenantCtx, TenantId};
34use reqwest::Client;
35use tokio::{spawn, sync::Mutex as AsyncMutex};
36
37pub struct AppState {
38 pub provider: WebChatProvider,
39 pub direct_line: Arc<dyn DirectLineApi>,
40 pub http_client: Client,
41 pub transport: SharedActivitiesTransport,
42 pub bus: SharedBus,
43 pub sessions: SharedSessionStore,
44 pub activity_poster: SharedDirectLinePoster,
45 pub oauth_client: Arc<dyn GreenticOauthClient>,
46 #[cfg(feature = "directline_standalone")]
47 pub conversations: SharedConversationStore,
48 circuit_settings: CircuitSettings,
49 token_circuits: CircuitRegistry,
50 conversation_circuits: CircuitRegistry,
51}
52
53impl Clone for AppState {
54 fn clone(&self) -> Self {
55 Self {
56 provider: self.provider.clone(),
57 direct_line: Arc::clone(&self.direct_line),
58 http_client: self.http_client.clone(),
59 transport: Arc::clone(&self.transport),
60 bus: Arc::clone(&self.bus),
61 sessions: Arc::clone(&self.sessions),
62 activity_poster: Arc::clone(&self.activity_poster),
63 oauth_client: Arc::clone(&self.oauth_client),
64 #[cfg(feature = "directline_standalone")]
65 conversations: Arc::clone(&self.conversations),
66 circuit_settings: self.circuit_settings.clone(),
67 token_circuits: self.token_circuits.clone(),
68 conversation_circuits: self.conversation_circuits.clone(),
69 }
70 }
71}
72
73impl AppState {
74 pub fn new(
75 provider: WebChatProvider,
76 direct_line: Arc<dyn DirectLineApi>,
77 http_client: Client,
78 ) -> Self {
79 let transport_client = http_client.clone();
80 let poster_client = http_client.clone();
81 let oauth_client_http = http_client.clone();
82 let transport: SharedActivitiesTransport =
83 Arc::new(ReqwestActivitiesTransport::new(transport_client));
84 let activity_poster: SharedDirectLinePoster =
85 Arc::new(HttpDirectLinePoster::new(poster_client));
86 let oauth_client: Arc<dyn GreenticOauthClient> =
87 Arc::new(ReqwestGreenticOauthClient::new(oauth_client_http));
88 let circuit_settings = CircuitSettings::default();
89 Self {
90 provider,
91 direct_line,
92 http_client,
93 transport,
94 bus: Arc::new(NoopBus),
95 sessions: Arc::new(MemorySessionStore::default()),
96 activity_poster,
97 oauth_client,
98 #[cfg(feature = "directline_standalone")]
99 conversations: noop_store(),
100 circuit_settings: circuit_settings.clone(),
101 token_circuits: CircuitRegistry::new(circuit_settings.clone()),
102 conversation_circuits: CircuitRegistry::new(circuit_settings),
103 }
104 }
105
106 pub fn with_bus(mut self, bus: SharedBus) -> Self {
107 self.bus = bus;
108 self
109 }
110
111 pub fn with_sessions(mut self, sessions: SharedSessionStore) -> Self {
112 self.sessions = sessions;
113 self
114 }
115
116 pub fn with_transport(mut self, transport: SharedActivitiesTransport) -> Self {
117 self.transport = transport;
118 self
119 }
120
121 pub fn with_activity_poster(mut self, poster: SharedDirectLinePoster) -> Self {
122 self.activity_poster = poster;
123 self
124 }
125
126 pub fn with_oauth_client(mut self, client: Arc<dyn GreenticOauthClient>) -> Self {
127 self.oauth_client = client;
128 self
129 }
130
131 #[cfg(feature = "directline_standalone")]
132 pub fn with_conversations(mut self, conversations: SharedConversationStore) -> Self {
133 self.conversations = conversations;
134 self
135 }
136
137 #[cfg(not(feature = "directline_standalone"))]
138 pub fn with_conversations(self, _conversations: SharedConversationStore) -> Self {
139 self
140 }
141
142 pub async fn post_activity(
143 &self,
144 conversation_id: &str,
145 bearer_token: &str,
146 activity: Value,
147 ) -> Result<(), WebChatError> {
148 validate_activity_for_post(&activity)?;
149 self.activity_poster
150 .post_activity(
151 self.provider.config().direct_line_base(),
152 conversation_id,
153 bearer_token,
154 activity,
155 )
156 .await
157 .map_err(WebChatError::from)
158 }
159}
160
161pub fn router(state: AppState) -> Router<AppState> {
162 Router::new()
163 .route("/webchat/healthz", get(healthz))
164 .route(
165 "/webchat/{env}/{tenant}/tokens/generate",
166 post(generate_token),
167 )
168 .route(
169 "/webchat/{env}/{tenant}/{team}/tokens/generate",
170 post(generate_token),
171 )
172 .route(
173 "/webchat/{env}/{tenant}/conversations/start",
174 post(start_conversation),
175 )
176 .route(
177 "/webchat/{env}/{tenant}/{team}/conversations/start",
178 post(start_conversation),
179 )
180 .route(
181 "/webchat/admin/{env}/{tenant}/post-activity",
182 post(admin_post_activity),
183 )
184 .route("/webchat/oauth/start", get(super::oauth::start))
185 .route("/webchat/oauth/callback", get(super::oauth::callback))
186 .with_state(state)
187}
188
189async fn healthz() -> StatusCode {
190 StatusCode::NO_CONTENT
191}
192
193#[derive(Deserialize)]
194struct TokenPath {
195 env: String,
196 tenant: String,
197 #[serde(default)]
198 team: Option<String>,
199}
200
201#[derive(Default, Deserialize)]
202struct GenerateTokenRequestBody {}
203
204#[derive(Serialize)]
205struct GenerateTokenResponse {
206 token: String,
207 expires_in: u64,
208}
209
210#[derive(Deserialize)]
211struct StartConversationRequest {
212 token: String,
213}
214
215#[derive(Serialize)]
216struct StartConversationResponse {
217 token: String,
218 #[serde(rename = "conversationId")]
219 conversation_id: String,
220 #[serde(rename = "streamUrl", skip_serializing_if = "Option::is_none")]
221 stream_url: Option<String>,
222 expires_in: u64,
223}
224
225async fn generate_token(
226 State(state): State<AppState>,
227 Path(path): Path<TokenPath>,
228 Json(_): Json<GenerateTokenRequestBody>,
229) -> Result<Json<GenerateTokenResponse>, WebChatError> {
230 let ctx = RouteContext::new(path.env, path.tenant, path.team);
231 let span = telemetry::span_for("tokens.generate", &ctx);
232 let _guard = span.enter();
233 let team_label = telemetry::team_or_dash(ctx.team());
234 let env_metric = ctx.env().to_string();
235 let tenant_metric = ctx.tenant().to_string();
236 let team_metric = team_label.to_string();
237 let tenant_ctx = build_tenant_ctx(&ctx);
238
239 let secret = state
240 .provider
241 .direct_line_secret(&tenant_ctx)
242 .await
243 .map_err(|err| {
244 counter!(
245 "webchat_errors_total",
246 "kind" => "secret_backend_error",
247 "env" => env_metric.clone(),
248 "tenant" => tenant_metric.clone(),
249 "team" => team_metric.clone()
250 )
251 .increment(1);
252 WebChatError::Internal(err)
253 })?
254 .ok_or_else(|| {
255 counter!(
256 "webchat_errors_total",
257 "kind" => "missing_secret",
258 "env" => env_metric.clone(),
259 "tenant" => tenant_metric.clone(),
260 "team" => team_metric.clone()
261 )
262 .increment(1);
263 WebChatError::MissingSecret
264 })?;
265
266 let secret = Arc::new(secret);
267 let response = call_with_circuit(
268 &state.token_circuits,
269 direct_line_circuit_key("tokens", &ctx),
270 direct_line_labels(&ctx),
271 {
272 let direct_line = Arc::clone(&state.direct_line);
273 let secret = Arc::clone(&secret);
274 move || {
275 let direct_line = Arc::clone(&direct_line);
276 let secret = Arc::clone(&secret);
277 async move { direct_line.generate_token(secret.as_ref()).await }
278 }
279 },
280 )
281 .await
282 .map_err(|err| map_directline_error("tokens.generate", err))?;
283
284 counter!(
285 "webchat_tokens_generated_total",
286 "env" => env_metric.clone(),
287 "tenant" => tenant_metric.clone(),
288 "team" => team_metric.clone()
289 )
290 .increment(1);
291
292 Ok(Json(token_response_body(response)))
293}
294
295async fn start_conversation(
296 State(state): State<AppState>,
297 Path(path): Path<TokenPath>,
298 Json(body): Json<StartConversationRequest>,
299) -> Result<Json<StartConversationResponse>, WebChatError> {
300 let ctx = RouteContext::new(path.env, path.tenant, path.team);
301 let span = telemetry::span_for("conversations.start", &ctx);
302 let _guard = span.enter();
303 let env_metric = ctx.env().to_string();
304 let tenant_metric = ctx.tenant().to_string();
305 let team_metric = telemetry::team_or_dash(ctx.team()).to_string();
306
307 let trimmed = body.token.trim();
308 if trimmed.is_empty() {
309 return Err(WebChatError::BadRequest("token is required"));
310 }
311
312 let token_for_retry = Arc::new(trimmed.to_string());
313 let conversation_response = call_with_circuit(
314 &state.conversation_circuits,
315 direct_line_circuit_key("conversations", &ctx),
316 direct_line_labels(&ctx),
317 {
318 let direct_line = Arc::clone(&state.direct_line);
319 let token = Arc::clone(&token_for_retry);
320 move || {
321 let direct_line = Arc::clone(&direct_line);
322 let token = Arc::clone(&token);
323 async move { direct_line.start_conversation(token.as_ref()).await }
324 }
325 },
326 )
327 .await
328 .map_err(|err| map_directline_error("conversations.start", err))?;
329
330 let tenant_ctx = build_tenant_ctx(&ctx);
331 state
332 .sessions
333 .upsert(WebchatSession::new(
334 conversation_response.conversation_id.clone(),
335 tenant_ctx.clone(),
336 conversation_response.token.clone(),
337 ))
338 .await
339 .map_err(WebChatError::Internal)?;
340
341 counter!(
342 "webchat_conversations_started_total",
343 "env" => env_metric.clone(),
344 "tenant" => tenant_metric.clone(),
345 "team" => team_metric.clone()
346 )
347 .increment(1);
348 let ingress_deps = IngressDeps {
349 bus: Arc::clone(&state.bus),
350 sessions: Arc::clone(&state.sessions),
351 dl_base: state.provider.config().direct_line_base().to_string(),
352 transport: Arc::clone(&state.transport),
353 circuit: state.circuit_settings.clone(),
354 };
355 let ingress_ctx = IngressCtx {
356 tenant_ctx,
357 conversation_id: conversation_response.conversation_id.clone(),
358 token: conversation_response.token.clone(),
359 };
360
361 spawn(async move {
362 if let Err(err) = run_poll_loop(ingress_deps, ingress_ctx).await {
363 warn!(error = %err, "webchat poll loop terminated");
364 }
365 });
366
367 Ok(Json(conversation_response_body(conversation_response)))
368}
369
370fn token_response_body(response: TokenResponse) -> GenerateTokenResponse {
371 GenerateTokenResponse {
372 token: response.token,
373 expires_in: response.expires_in.unwrap_or(DEFAULT_EXPIRY_SECONDS),
374 }
375}
376
377fn conversation_response_body(response: ConversationResponse) -> StartConversationResponse {
378 StartConversationResponse {
379 token: response.token,
380 conversation_id: response.conversation_id,
381 stream_url: response.stream_url,
382 expires_in: response.expires_in.unwrap_or(DEFAULT_EXPIRY_SECONDS),
383 }
384}
385
386const DEFAULT_EXPIRY_SECONDS: u64 = 1800;
387
388fn build_tenant_ctx(ctx: &RouteContext) -> TenantCtx {
389 let mut tenant_ctx = TenantCtx::new(
390 EnvId(ctx.env().to_string()),
391 TenantId(ctx.tenant().to_string()),
392 );
393 if let Some(team) = ctx.team() {
394 let team_id = TeamId(team.to_string());
395 tenant_ctx = tenant_ctx.with_team(Some(team_id));
396 }
397 tenant_ctx
398}
399
400fn map_directline_error(action: &str, err: DirectLineError) -> WebChatError {
401 log_directline_error(action, &err);
402 WebChatError::from(err)
403}
404
405const MAX_DIRECT_LINE_RETRIES: u32 = 5;
406const NO_CONVERSATION_LABEL: &str = "-";
407
408#[derive(Clone)]
409struct CircuitRegistry {
410 settings: CircuitSettings,
411 inner: Arc<AsyncMutex<HashMap<String, Arc<AsyncMutex<CircuitBreaker>>>>>,
412}
413
414impl CircuitRegistry {
415 fn new(settings: CircuitSettings) -> Self {
416 Self {
417 settings,
418 inner: Arc::new(AsyncMutex::new(HashMap::new())),
419 }
420 }
421
422 async fn circuit(&self, key: &str, labels: CircuitLabels) -> Arc<AsyncMutex<CircuitBreaker>> {
423 let mut guard = self.inner.lock().await;
424 if let Some(existing) = guard.get(key) {
425 existing.clone()
426 } else {
427 let circuit = Arc::new(AsyncMutex::new(CircuitBreaker::new(
428 self.settings.clone(),
429 labels,
430 )));
431 guard.insert(key.to_string(), circuit.clone());
432 circuit
433 }
434 }
435}
436
437fn direct_line_circuit_key(scope: &str, ctx: &RouteContext) -> String {
438 let team = telemetry::team_or_dash(ctx.team());
439 let team_slug = team.to_ascii_lowercase();
440 format!(
441 "{scope}:{env}:{tenant}:{team}",
442 scope = scope,
443 env = ctx.env().to_ascii_lowercase(),
444 tenant = ctx.tenant().to_ascii_lowercase(),
445 team = team_slug
446 )
447}
448
449fn direct_line_labels(ctx: &RouteContext) -> CircuitLabels {
450 CircuitLabels::new(
451 ctx.env().to_string(),
452 ctx.tenant().to_string(),
453 telemetry::team_or_dash(ctx.team()).to_string(),
454 NO_CONVERSATION_LABEL.to_string(),
455 )
456}
457
458fn is_retryable(err: &DirectLineError) -> bool {
459 matches!(err, DirectLineError::Transport(_))
460 || matches!(err, DirectLineError::Remote { status, .. }
461 if *status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error())
462}
463
464async fn call_with_circuit<F, Fut, T>(
465 registry: &CircuitRegistry,
466 key: String,
467 labels: CircuitLabels,
468 mut operation: F,
469) -> Result<T, DirectLineError>
470where
471 F: FnMut() -> Fut,
472 Fut: Future<Output = Result<T, DirectLineError>>,
473{
474 let circuit = registry.circuit(&key, labels).await;
475 let mut attempt: u32 = 0;
476 loop {
477 {
478 let mut guard = circuit.lock().await;
479 guard.before_request().await;
480 }
481
482 match operation().await {
483 Ok(value) => {
484 let mut guard = circuit.lock().await;
485 guard.on_success();
486 return Ok(value);
487 }
488 Err(err) => {
489 let retryable = is_retryable(&err);
490 {
491 let mut guard = circuit.lock().await;
492 guard.on_failure();
493 }
494
495 if retryable && attempt < MAX_DIRECT_LINE_RETRIES {
496 attempt = attempt.saturating_add(1);
497 if let DirectLineError::Remote {
498 retry_after: Some(delay),
499 ..
500 } = &err
501 {
502 tokio::time::sleep(*delay).await;
503 } else {
504 backoff::sleep(attempt).await;
505 }
506 continue;
507 }
508
509 return Err(err);
510 }
511 }
512 }
513}
514
515fn log_directline_error(action: &str, err: &DirectLineError) {
516 match err {
517 DirectLineError::Remote {
518 status,
519 retry_after,
520 ..
521 } => {
522 let status_label = status.as_str().to_string();
523 let endpoint_label = action.to_string();
524 warn!(%action, %status, retry_after = retry_after.map(|d| d.as_secs()), "direct line remote error");
525 counter!(
526 "webchat_errors_total",
527 "kind" => "directline_remote",
528 "endpoint" => endpoint_label,
529 "status" => status_label
530 )
531 .increment(1);
532 }
533 DirectLineError::Transport(source) => {
534 warn!(%action, error = %source, "direct line transport error");
535 counter!(
536 "webchat_errors_total",
537 "kind" => "directline_transport",
538 "endpoint" => action.to_string()
539 )
540 .increment(1);
541 }
542 DirectLineError::Decode(_) => {
543 warn!(%action, "direct line decode error");
544 counter!(
545 "webchat_errors_total",
546 "kind" => "directline_decode",
547 "endpoint" => action.to_string()
548 )
549 .increment(1);
550 }
551 DirectLineError::Config(_) => {
552 warn!(%action, "direct line configuration error");
553 counter!(
554 "webchat_errors_total",
555 "kind" => "directline_config",
556 "endpoint" => action.to_string()
557 )
558 .increment(1);
559 }
560 }
561}
562
563const ALLOWED_ATTACHMENT_TYPES: &[&str] = &[
564 "application/vnd.microsoft.card.adaptive",
565 "application/json",
566 "image/png",
567 "image/jpeg",
568 "image/gif",
569];
570
571const MAX_ATTACHMENT_BYTES: usize = 512 * 1024;
572
573fn validate_activity_for_post(activity: &Value) -> Result<(), WebChatError> {
574 if let Some(attachments) = activity.get("attachments").and_then(Value::as_array) {
575 for attachment in attachments {
576 let content_type = attachment
577 .get("contentType")
578 .or_else(|| attachment.get("content_type"))
579 .and_then(Value::as_str)
580 .ok_or(WebChatError::BadRequest("attachment missing contentType"))?;
581
582 let allowed = ALLOWED_ATTACHMENT_TYPES
583 .iter()
584 .any(|allowed| content_type.eq_ignore_ascii_case(allowed));
585 if !allowed {
586 return Err(WebChatError::BadRequest(
587 "attachment content type not allowed",
588 ));
589 }
590
591 if let Some(content) = attachment.get("content") {
592 let serialized = serde_json::to_vec(content).map_err(|_| {
593 WebChatError::BadRequest("attachment content is not valid JSON")
594 })?;
595 if serialized.len() > MAX_ATTACHMENT_BYTES {
596 return Err(WebChatError::BadRequest(
597 "attachment content exceeds size limit",
598 ));
599 }
600 }
601 }
602 }
603
604 Ok(())
605}
606
607#[derive(Deserialize)]
608pub struct AdminPath {
609 pub env: String,
610 pub tenant: String,
611}
612
613#[derive(Deserialize)]
614pub struct AdminPostActivityRequest {
615 #[serde(default)]
616 pub team: Option<String>,
617 #[serde(rename = "conversation_id", default)]
618 pub conversation_id: Option<String>,
619 pub activity: Value,
620}
621
622#[derive(Serialize)]
623pub struct AdminPostActivityResponse {
624 pub posted: usize,
625 pub skipped: usize,
626}
627
628pub async fn admin_post_activity(
629 State(state): State<AppState>,
630 Path(path): Path<AdminPath>,
631 Json(body): Json<AdminPostActivityRequest>,
632) -> Result<Json<AdminPostActivityResponse>, WebChatError> {
633 let AdminPostActivityRequest {
634 team,
635 conversation_id,
636 activity,
637 } = body;
638
639 if !activity.is_object() {
640 return Err(WebChatError::BadRequest("activity must be an object"));
641 }
642
643 let team_filter = team.as_deref();
644
645 if let Some(conversation_id) = conversation_id.as_deref() {
646 let session = state
647 .sessions
648 .get(conversation_id)
649 .await
650 .map_err(WebChatError::Internal)?
651 .ok_or(WebChatError::NotFound("conversation not found"))?;
652
653 if !session
654 .tenant_ctx
655 .env
656 .as_ref()
657 .eq_ignore_ascii_case(&path.env)
658 || !session
659 .tenant_ctx
660 .tenant
661 .as_ref()
662 .eq_ignore_ascii_case(&path.tenant)
663 {
664 return Err(WebChatError::NotFound("conversation not found"));
665 }
666
667 if let Some(team) = team_filter
668 && !session
669 .tenant_ctx
670 .team
671 .as_ref()
672 .map(|value| value.as_ref().eq_ignore_ascii_case(team))
673 .unwrap_or(false)
674 {
675 return Err(WebChatError::NotFound("conversation not found"));
676 }
677
678 return append_and_broadcast(&state, &session, activity)
679 .await
680 .map(|posted| Json(AdminPostActivityResponse { posted, skipped: 0 }));
681 }
682
683 let sessions = state
684 .sessions
685 .list_by_tenant(&path.env, &path.tenant, team_filter)
686 .await
687 .map_err(WebChatError::Internal)?;
688
689 if sessions.is_empty() {
690 return Err(WebChatError::NotFound("no matching sessions"));
691 }
692
693 let mut posted = 0usize;
694 let mut skipped = 0usize;
695 for session in sessions {
696 match append_and_broadcast(&state, &session, activity.clone()).await {
697 Ok(count) => posted += count,
698 Err(WebChatError::BadRequest(_)) => {
699 skipped += 1;
700 continue;
701 }
702 Err(other) => return Err(other),
703 }
704 }
705
706 Ok(Json(AdminPostActivityResponse { posted, skipped }))
707}
708
709#[cfg(feature = "directline_standalone")]
710async fn append_and_broadcast(
711 state: &AppState,
712 session: &WebchatSession,
713 activity_json: Value,
714) -> Result<usize, WebChatError> {
715 if !session.proactive_ok {
716 return Err(WebChatError::BadRequest("proactive messaging disabled"));
717 }
718 let mut activity: Activity = serde_json::from_value(activity_json)
719 .map_err(|_| WebChatError::BadRequest("activity must match Bot Framework schema"))?;
720 apply_bot_defaults(&mut activity, &session.conversation_id);
721
722 let stored = match state
723 .conversations
724 .append(&session.conversation_id, activity.clone())
725 .await
726 {
727 Ok(stored) => stored,
728 Err(StoreError::NotFound(_)) => {
729 state
730 .conversations
731 .create(&session.conversation_id, session.tenant_ctx.clone())
732 .await
733 .map_err(|err| WebChatError::Internal(err.into()))?;
734 state
735 .conversations
736 .append(&session.conversation_id, activity)
737 .await
738 .map_err(|err| WebChatError::Internal(err.into()))?
739 }
740 Err(StoreError::QuotaExceeded(_)) => {
741 return Err(WebChatError::BadRequest(
742 "conversation backlog quota exceeded",
743 ));
744 }
745 Err(err) => return Err(WebChatError::Internal(err.into())),
746 };
747
748 if let Err(err) = state
749 .sessions
750 .update_watermark(
751 &session.conversation_id,
752 Some((stored.watermark + 1).to_string()),
753 )
754 .await
755 {
756 warn!(error = %err, "failed to update watermark");
757 }
758
759 Ok(1)
760}
761
762#[cfg(feature = "directline_standalone")]
763fn apply_bot_defaults(activity: &mut Activity, conversation_id: &str) {
764 match activity.from.as_mut() {
765 Some(from) => {
766 if from.id.trim().is_empty() {
767 from.id = "bot".into();
768 }
769 from.role = Some("bot".into());
770 }
771 None => {
772 activity.from = Some(ChannelAccount {
773 id: "bot".into(),
774 name: None,
775 role: Some("bot".into()),
776 });
777 }
778 }
779 if activity
780 .conversation
781 .as_ref()
782 .map(|conv| conv.id.trim().is_empty())
783 .unwrap_or(true)
784 {
785 activity.conversation = Some(ConversationAccount {
786 id: conversation_id.to_string(),
787 });
788 }
789}
790
791#[cfg(not(feature = "directline_standalone"))]
792async fn append_and_broadcast(
793 state: &AppState,
794 session: &WebchatSession,
795 activity: Value,
796) -> Result<usize, WebChatError> {
797 if !session.proactive_ok {
798 return Err(WebChatError::BadRequest("proactive messaging disabled"));
799 }
800 state
801 .post_activity(&session.conversation_id, &session.bearer_token, activity)
802 .await?;
803 Ok(1)
804}
805
806pub type SharedDirectLinePoster = Arc<dyn DirectLinePoster>;
807
808#[async_trait]
809pub trait DirectLinePoster: Send + Sync {
810 async fn post_activity(
811 &self,
812 base_url: &str,
813 conversation_id: &str,
814 bearer_token: &str,
815 activity: Value,
816 ) -> Result<(), DirectLineError>;
817}
818
819pub struct HttpDirectLinePoster {
820 client: Client,
821}
822
823impl HttpDirectLinePoster {
824 pub fn new(client: Client) -> Self {
825 Self { client }
826 }
827}
828
829#[async_trait]
830impl DirectLinePoster for HttpDirectLinePoster {
831 async fn post_activity(
832 &self,
833 base_url: &str,
834 conversation_id: &str,
835 bearer_token: &str,
836 activity: Value,
837 ) -> Result<(), DirectLineError> {
838 let url = format!(
839 "{}/conversations/{}/activities",
840 base_url.trim_end_matches('/'),
841 conversation_id
842 );
843
844 let response = self
845 .client
846 .post(url)
847 .bearer_auth(bearer_token)
848 .json(&activity)
849 .send()
850 .await
851 .map_err(DirectLineError::Transport)?;
852
853 let status = response.status();
854 if status.is_success() {
855 return Ok(());
856 }
857
858 let retry_after = response
859 .headers()
860 .get(axum::http::header::RETRY_AFTER)
861 .and_then(|value| value.to_str().ok())
862 .and_then(|value| value.parse::<u64>().ok())
863 .map(std::time::Duration::from_secs);
864 let body = response
865 .text()
866 .await
867 .unwrap_or_else(|_| "<unreadable>".to_string());
868 let message = if body.len() > 512 {
869 body[..512].to_string()
870 } else {
871 body
872 };
873
874 Err(DirectLineError::Remote {
875 status,
876 retry_after,
877 message,
878 })
879 }
880}
881
882#[cfg(test)]
883mod tests {
884 use super::*;
885
886 #[tokio::test]
887 async fn healthz_returns_no_content() {
888 assert_eq!(healthz().await, StatusCode::NO_CONTENT);
889 }
890}