1use std::{collections::HashMap, future::Future, sync::Arc};
2
3use axum::http::StatusCode;
4use axum::{
5 Json, Router,
6 extract::{Path, State},
7 routing::{get, post},
8};
9use metrics::counter;
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tracing::warn;
13
14use super::conversation::{
15 Activity, ChannelAccount, ConversationAccount, SharedConversationStore, StoreError, noop_store,
16};
17use super::{
18 WebChatProvider, backoff,
19 bus::{NoopBus, SharedBus},
20 circuit::{CircuitBreaker, CircuitLabels, CircuitSettings},
21 directline_client::{ConversationResponse, DirectLineApi, DirectLineError, TokenResponse},
22 error::WebChatError,
23 ingress::{
24 IngressCtx, IngressDeps, ReqwestActivitiesTransport, SharedActivitiesTransport,
25 run_poll_loop,
26 },
27 oauth::{GreenticOauthClient, ReqwestGreenticOauthClient},
28 provider::RouteContext,
29 session::{MemorySessionStore, SharedSessionStore, WebchatSession},
30 telemetry,
31};
32use async_trait::async_trait;
33use greentic_types::{EnvId, TeamId, TenantCtx, TenantId};
34use reqwest::Client;
35use tokio::{spawn, sync::Mutex as AsyncMutex};
36
37pub struct AppState {
38 pub provider: WebChatProvider,
39 pub direct_line: Arc<dyn DirectLineApi>,
40 pub http_client: Client,
41 pub transport: SharedActivitiesTransport,
42 pub bus: SharedBus,
43 pub sessions: SharedSessionStore,
44 pub activity_poster: SharedDirectLinePoster,
45 pub oauth_client: Arc<dyn GreenticOauthClient>,
46 pub conversations: SharedConversationStore,
47 circuit_settings: CircuitSettings,
48 token_circuits: CircuitRegistry,
49 conversation_circuits: CircuitRegistry,
50}
51
52impl Clone for AppState {
53 fn clone(&self) -> Self {
54 Self {
55 provider: self.provider.clone(),
56 direct_line: Arc::clone(&self.direct_line),
57 http_client: self.http_client.clone(),
58 transport: Arc::clone(&self.transport),
59 bus: Arc::clone(&self.bus),
60 sessions: Arc::clone(&self.sessions),
61 activity_poster: Arc::clone(&self.activity_poster),
62 oauth_client: Arc::clone(&self.oauth_client),
63 conversations: Arc::clone(&self.conversations),
64 circuit_settings: self.circuit_settings.clone(),
65 token_circuits: self.token_circuits.clone(),
66 conversation_circuits: self.conversation_circuits.clone(),
67 }
68 }
69}
70
71impl AppState {
72 pub fn new(
73 provider: WebChatProvider,
74 direct_line: Arc<dyn DirectLineApi>,
75 http_client: Client,
76 ) -> Self {
77 let transport_client = http_client.clone();
78 let poster_client = http_client.clone();
79 let oauth_client_http = http_client.clone();
80 let transport: SharedActivitiesTransport =
81 Arc::new(ReqwestActivitiesTransport::new(transport_client));
82 let activity_poster: SharedDirectLinePoster =
83 Arc::new(HttpDirectLinePoster::new(poster_client));
84 let oauth_client: Arc<dyn GreenticOauthClient> =
85 Arc::new(ReqwestGreenticOauthClient::new(oauth_client_http));
86 let circuit_settings = CircuitSettings::default();
87 Self {
88 provider,
89 direct_line,
90 http_client,
91 transport,
92 bus: Arc::new(NoopBus),
93 sessions: Arc::new(MemorySessionStore::default()),
94 activity_poster,
95 oauth_client,
96 conversations: noop_store(),
97 circuit_settings: circuit_settings.clone(),
98 token_circuits: CircuitRegistry::new(circuit_settings.clone()),
99 conversation_circuits: CircuitRegistry::new(circuit_settings),
100 }
101 }
102
103 pub fn with_bus(mut self, bus: SharedBus) -> Self {
104 self.bus = bus;
105 self
106 }
107
108 pub fn with_sessions(mut self, sessions: SharedSessionStore) -> Self {
109 self.sessions = sessions;
110 self
111 }
112
113 pub fn with_transport(mut self, transport: SharedActivitiesTransport) -> Self {
114 self.transport = transport;
115 self
116 }
117
118 pub fn with_activity_poster(mut self, poster: SharedDirectLinePoster) -> Self {
119 self.activity_poster = poster;
120 self
121 }
122
123 pub fn with_oauth_client(mut self, client: Arc<dyn GreenticOauthClient>) -> Self {
124 self.oauth_client = client;
125 self
126 }
127
128 pub fn with_conversations(mut self, conversations: SharedConversationStore) -> Self {
129 self.conversations = conversations;
130 self
131 }
132
133 pub async fn post_activity(
134 &self,
135 conversation_id: &str,
136 bearer_token: &str,
137 activity: Value,
138 ) -> Result<(), WebChatError> {
139 validate_activity_for_post(&activity)?;
140 self.activity_poster
141 .post_activity(
142 self.provider.config().direct_line_base(),
143 conversation_id,
144 bearer_token,
145 activity,
146 )
147 .await
148 .map_err(WebChatError::from)
149 }
150}
151
152pub fn router(state: AppState) -> Router<AppState> {
153 Router::new()
154 .route("/webchat/healthz", get(healthz))
155 .route(
156 "/webchat/{env}/{tenant}/tokens/generate",
157 post(generate_token),
158 )
159 .route(
160 "/webchat/{env}/{tenant}/{team}/tokens/generate",
161 post(generate_token),
162 )
163 .route(
164 "/webchat/{env}/{tenant}/conversations/start",
165 post(start_conversation),
166 )
167 .route(
168 "/webchat/{env}/{tenant}/{team}/conversations/start",
169 post(start_conversation),
170 )
171 .route(
172 "/webchat/admin/{env}/{tenant}/post-activity",
173 post(admin_post_activity),
174 )
175 .route("/webchat/oauth/start", get(super::oauth::start))
176 .route("/webchat/oauth/callback", get(super::oauth::callback))
177 .with_state(state)
178}
179
180async fn healthz() -> StatusCode {
181 StatusCode::NO_CONTENT
182}
183
184#[derive(Deserialize)]
185struct TokenPath {
186 env: String,
187 tenant: String,
188 #[serde(default)]
189 team: Option<String>,
190}
191
192#[derive(Default, Deserialize)]
193struct GenerateTokenRequestBody {}
194
195#[derive(Serialize)]
196struct GenerateTokenResponse {
197 token: String,
198 expires_in: u64,
199}
200
201#[derive(Deserialize)]
202struct StartConversationRequest {
203 token: String,
204}
205
206#[derive(Serialize)]
207struct StartConversationResponse {
208 token: String,
209 #[serde(rename = "conversationId")]
210 conversation_id: String,
211 #[serde(rename = "streamUrl", skip_serializing_if = "Option::is_none")]
212 stream_url: Option<String>,
213 expires_in: u64,
214}
215
216async fn generate_token(
217 State(state): State<AppState>,
218 Path(path): Path<TokenPath>,
219 Json(_): Json<GenerateTokenRequestBody>,
220) -> Result<Json<GenerateTokenResponse>, WebChatError> {
221 let ctx = RouteContext::new(path.env, path.tenant, path.team);
222 let span = telemetry::span_for("tokens.generate", &ctx);
223 let _guard = span.enter();
224 let team_label = telemetry::team_or_dash(ctx.team());
225 let env_metric = ctx.env().to_string();
226 let tenant_metric = ctx.tenant().to_string();
227 let team_metric = team_label.to_string();
228 let tenant_ctx = build_tenant_ctx(&ctx);
229
230 let secret = state
231 .provider
232 .direct_line_secret(&tenant_ctx)
233 .await
234 .map_err(|err| {
235 counter!(
236 "webchat_errors_total",
237 "kind" => "secret_backend_error",
238 "env" => env_metric.clone(),
239 "tenant" => tenant_metric.clone(),
240 "team" => team_metric.clone()
241 )
242 .increment(1);
243 WebChatError::Internal(err)
244 })?
245 .ok_or_else(|| {
246 counter!(
247 "webchat_errors_total",
248 "kind" => "missing_secret",
249 "env" => env_metric.clone(),
250 "tenant" => tenant_metric.clone(),
251 "team" => team_metric.clone()
252 )
253 .increment(1);
254 WebChatError::MissingSecret
255 })?;
256
257 let secret = Arc::new(secret);
258 let response = call_with_circuit(
259 &state.token_circuits,
260 direct_line_circuit_key("tokens", &ctx),
261 direct_line_labels(&ctx),
262 {
263 let direct_line = Arc::clone(&state.direct_line);
264 let secret = Arc::clone(&secret);
265 move || {
266 let direct_line = Arc::clone(&direct_line);
267 let secret = Arc::clone(&secret);
268 async move { direct_line.generate_token(secret.as_ref()).await }
269 }
270 },
271 )
272 .await
273 .map_err(|err| map_directline_error("tokens.generate", err))?;
274
275 counter!(
276 "webchat_tokens_generated_total",
277 "env" => env_metric.clone(),
278 "tenant" => tenant_metric.clone(),
279 "team" => team_metric.clone()
280 )
281 .increment(1);
282
283 Ok(Json(token_response_body(response)))
284}
285
286async fn start_conversation(
287 State(state): State<AppState>,
288 Path(path): Path<TokenPath>,
289 Json(body): Json<StartConversationRequest>,
290) -> Result<Json<StartConversationResponse>, WebChatError> {
291 let ctx = RouteContext::new(path.env, path.tenant, path.team);
292 let span = telemetry::span_for("conversations.start", &ctx);
293 let _guard = span.enter();
294 let env_metric = ctx.env().to_string();
295 let tenant_metric = ctx.tenant().to_string();
296 let team_metric = telemetry::team_or_dash(ctx.team()).to_string();
297
298 let trimmed = body.token.trim();
299 if trimmed.is_empty() {
300 return Err(WebChatError::BadRequest("token is required"));
301 }
302
303 let token_for_retry = Arc::new(trimmed.to_string());
304 let conversation_response = call_with_circuit(
305 &state.conversation_circuits,
306 direct_line_circuit_key("conversations", &ctx),
307 direct_line_labels(&ctx),
308 {
309 let direct_line = Arc::clone(&state.direct_line);
310 let token = Arc::clone(&token_for_retry);
311 move || {
312 let direct_line = Arc::clone(&direct_line);
313 let token = Arc::clone(&token);
314 async move { direct_line.start_conversation(token.as_ref()).await }
315 }
316 },
317 )
318 .await
319 .map_err(|err| map_directline_error("conversations.start", err))?;
320
321 let tenant_ctx = build_tenant_ctx(&ctx);
322 state
323 .sessions
324 .upsert(WebchatSession::new(
325 conversation_response.conversation_id.clone(),
326 tenant_ctx.clone(),
327 conversation_response.token.clone(),
328 ))
329 .await
330 .map_err(WebChatError::Internal)?;
331
332 counter!(
333 "webchat_conversations_started_total",
334 "env" => env_metric.clone(),
335 "tenant" => tenant_metric.clone(),
336 "team" => team_metric.clone()
337 )
338 .increment(1);
339 let ingress_deps = IngressDeps {
340 bus: Arc::clone(&state.bus),
341 sessions: Arc::clone(&state.sessions),
342 dl_base: state.provider.config().direct_line_base().to_string(),
343 transport: Arc::clone(&state.transport),
344 circuit: state.circuit_settings.clone(),
345 };
346 let ingress_ctx = IngressCtx {
347 tenant_ctx,
348 conversation_id: conversation_response.conversation_id.clone(),
349 token: conversation_response.token.clone(),
350 };
351
352 spawn(async move {
353 if let Err(err) = run_poll_loop(ingress_deps, ingress_ctx).await {
354 warn!(error = %err, "webchat poll loop terminated");
355 }
356 });
357
358 Ok(Json(conversation_response_body(conversation_response)))
359}
360
361fn token_response_body(response: TokenResponse) -> GenerateTokenResponse {
362 GenerateTokenResponse {
363 token: response.token,
364 expires_in: response.expires_in.unwrap_or(DEFAULT_EXPIRY_SECONDS),
365 }
366}
367
368fn conversation_response_body(response: ConversationResponse) -> StartConversationResponse {
369 StartConversationResponse {
370 token: response.token,
371 conversation_id: response.conversation_id,
372 stream_url: response.stream_url,
373 expires_in: response.expires_in.unwrap_or(DEFAULT_EXPIRY_SECONDS),
374 }
375}
376
377const DEFAULT_EXPIRY_SECONDS: u64 = 1800;
378
379fn build_tenant_ctx(ctx: &RouteContext) -> TenantCtx {
380 let mut tenant_ctx = TenantCtx::new(
381 EnvId(ctx.env().to_string()),
382 TenantId(ctx.tenant().to_string()),
383 );
384 if let Some(team) = ctx.team() {
385 let team_id = TeamId(team.to_string());
386 tenant_ctx = tenant_ctx.with_team(Some(team_id));
387 }
388 tenant_ctx
389}
390
391fn map_directline_error(action: &str, err: DirectLineError) -> WebChatError {
392 log_directline_error(action, &err);
393 WebChatError::from(err)
394}
395
396const MAX_DIRECT_LINE_RETRIES: u32 = 5;
397const NO_CONVERSATION_LABEL: &str = "-";
398
399#[derive(Clone)]
400struct CircuitRegistry {
401 settings: CircuitSettings,
402 inner: Arc<AsyncMutex<HashMap<String, Arc<AsyncMutex<CircuitBreaker>>>>>,
403}
404
405impl CircuitRegistry {
406 fn new(settings: CircuitSettings) -> Self {
407 Self {
408 settings,
409 inner: Arc::new(AsyncMutex::new(HashMap::new())),
410 }
411 }
412
413 async fn circuit(&self, key: &str, labels: CircuitLabels) -> Arc<AsyncMutex<CircuitBreaker>> {
414 let mut guard = self.inner.lock().await;
415 if let Some(existing) = guard.get(key) {
416 existing.clone()
417 } else {
418 let circuit = Arc::new(AsyncMutex::new(CircuitBreaker::new(
419 self.settings.clone(),
420 labels,
421 )));
422 guard.insert(key.to_string(), circuit.clone());
423 circuit
424 }
425 }
426}
427
428fn direct_line_circuit_key(scope: &str, ctx: &RouteContext) -> String {
429 let team = telemetry::team_or_dash(ctx.team());
430 let team_slug = team.to_ascii_lowercase();
431 format!(
432 "{scope}:{env}:{tenant}:{team}",
433 scope = scope,
434 env = ctx.env().to_ascii_lowercase(),
435 tenant = ctx.tenant().to_ascii_lowercase(),
436 team = team_slug
437 )
438}
439
440fn direct_line_labels(ctx: &RouteContext) -> CircuitLabels {
441 CircuitLabels::new(
442 ctx.env().to_string(),
443 ctx.tenant().to_string(),
444 telemetry::team_or_dash(ctx.team()).to_string(),
445 NO_CONVERSATION_LABEL.to_string(),
446 )
447}
448
449fn is_retryable(err: &DirectLineError) -> bool {
450 matches!(err, DirectLineError::Transport(_))
451 || matches!(err, DirectLineError::Remote { status, .. }
452 if *status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error())
453}
454
455async fn call_with_circuit<F, Fut, T>(
456 registry: &CircuitRegistry,
457 key: String,
458 labels: CircuitLabels,
459 mut operation: F,
460) -> Result<T, DirectLineError>
461where
462 F: FnMut() -> Fut,
463 Fut: Future<Output = Result<T, DirectLineError>>,
464{
465 let circuit = registry.circuit(&key, labels).await;
466 let mut attempt: u32 = 0;
467 loop {
468 {
469 let mut guard = circuit.lock().await;
470 guard.before_request().await;
471 }
472
473 match operation().await {
474 Ok(value) => {
475 let mut guard = circuit.lock().await;
476 guard.on_success();
477 return Ok(value);
478 }
479 Err(err) => {
480 let retryable = is_retryable(&err);
481 {
482 let mut guard = circuit.lock().await;
483 guard.on_failure();
484 }
485
486 if retryable && attempt < MAX_DIRECT_LINE_RETRIES {
487 attempt = attempt.saturating_add(1);
488 if let DirectLineError::Remote {
489 retry_after: Some(delay),
490 ..
491 } = &err
492 {
493 tokio::time::sleep(*delay).await;
494 } else {
495 backoff::sleep(attempt).await;
496 }
497 continue;
498 }
499
500 return Err(err);
501 }
502 }
503 }
504}
505
506fn log_directline_error(action: &str, err: &DirectLineError) {
507 match err {
508 DirectLineError::Remote {
509 status,
510 retry_after,
511 ..
512 } => {
513 let status_label = status.as_str().to_string();
514 let endpoint_label = action.to_string();
515 warn!(%action, %status, retry_after = retry_after.map(|d| d.as_secs()), "direct line remote error");
516 counter!(
517 "webchat_errors_total",
518 "kind" => "directline_remote",
519 "endpoint" => endpoint_label,
520 "status" => status_label
521 )
522 .increment(1);
523 }
524 DirectLineError::Transport(source) => {
525 warn!(%action, error = %source, "direct line transport error");
526 counter!(
527 "webchat_errors_total",
528 "kind" => "directline_transport",
529 "endpoint" => action.to_string()
530 )
531 .increment(1);
532 }
533 DirectLineError::Decode(_) => {
534 warn!(%action, "direct line decode error");
535 counter!(
536 "webchat_errors_total",
537 "kind" => "directline_decode",
538 "endpoint" => action.to_string()
539 )
540 .increment(1);
541 }
542 DirectLineError::Config(_) => {
543 warn!(%action, "direct line configuration error");
544 counter!(
545 "webchat_errors_total",
546 "kind" => "directline_config",
547 "endpoint" => action.to_string()
548 )
549 .increment(1);
550 }
551 }
552}
553
554const ALLOWED_ATTACHMENT_TYPES: &[&str] = &[
555 "application/vnd.microsoft.card.adaptive",
556 "application/json",
557 "image/png",
558 "image/jpeg",
559 "image/gif",
560];
561
562const MAX_ATTACHMENT_BYTES: usize = 512 * 1024;
563
564fn validate_activity_for_post(activity: &Value) -> Result<(), WebChatError> {
565 if let Some(attachments) = activity.get("attachments").and_then(Value::as_array) {
566 for attachment in attachments {
567 let content_type = attachment
568 .get("contentType")
569 .or_else(|| attachment.get("content_type"))
570 .and_then(Value::as_str)
571 .ok_or(WebChatError::BadRequest("attachment missing contentType"))?;
572
573 let allowed = ALLOWED_ATTACHMENT_TYPES
574 .iter()
575 .any(|allowed| content_type.eq_ignore_ascii_case(allowed));
576 if !allowed {
577 return Err(WebChatError::BadRequest(
578 "attachment content type not allowed",
579 ));
580 }
581
582 if let Some(content) = attachment.get("content") {
583 let serialized = serde_json::to_vec(content).map_err(|_| {
584 WebChatError::BadRequest("attachment content is not valid JSON")
585 })?;
586 if serialized.len() > MAX_ATTACHMENT_BYTES {
587 return Err(WebChatError::BadRequest(
588 "attachment content exceeds size limit",
589 ));
590 }
591 }
592 }
593 }
594
595 Ok(())
596}
597
598#[derive(Deserialize)]
599pub struct AdminPath {
600 pub env: String,
601 pub tenant: String,
602}
603
604#[derive(Deserialize)]
605pub struct AdminPostActivityRequest {
606 #[serde(default)]
607 pub team: Option<String>,
608 #[serde(rename = "conversation_id", default)]
609 pub conversation_id: Option<String>,
610 pub activity: Value,
611}
612
613#[derive(Serialize)]
614pub struct AdminPostActivityResponse {
615 pub posted: usize,
616 pub skipped: usize,
617}
618
619pub async fn admin_post_activity(
620 State(state): State<AppState>,
621 Path(path): Path<AdminPath>,
622 Json(body): Json<AdminPostActivityRequest>,
623) -> Result<Json<AdminPostActivityResponse>, WebChatError> {
624 let AdminPostActivityRequest {
625 team,
626 conversation_id,
627 activity,
628 } = body;
629
630 if !activity.is_object() {
631 return Err(WebChatError::BadRequest("activity must be an object"));
632 }
633
634 let team_filter = team.as_deref();
635
636 if let Some(conversation_id) = conversation_id.as_deref() {
637 let session = state
638 .sessions
639 .get(conversation_id)
640 .await
641 .map_err(WebChatError::Internal)?
642 .ok_or(WebChatError::NotFound("conversation not found"))?;
643
644 if !session
645 .tenant_ctx
646 .env
647 .as_ref()
648 .eq_ignore_ascii_case(&path.env)
649 || !session
650 .tenant_ctx
651 .tenant
652 .as_ref()
653 .eq_ignore_ascii_case(&path.tenant)
654 {
655 return Err(WebChatError::NotFound("conversation not found"));
656 }
657
658 if let Some(team) = team_filter
659 && !session
660 .tenant_ctx
661 .team
662 .as_ref()
663 .map(|value| value.as_ref().eq_ignore_ascii_case(team))
664 .unwrap_or(false)
665 {
666 return Err(WebChatError::NotFound("conversation not found"));
667 }
668
669 return append_and_broadcast(&state, &session, activity)
670 .await
671 .map(|posted| Json(AdminPostActivityResponse { posted, skipped: 0 }));
672 }
673
674 let sessions = state
675 .sessions
676 .list_by_tenant(&path.env, &path.tenant, team_filter)
677 .await
678 .map_err(WebChatError::Internal)?;
679
680 if sessions.is_empty() {
681 return Err(WebChatError::NotFound("no matching sessions"));
682 }
683
684 let mut posted = 0usize;
685 let mut skipped = 0usize;
686 for session in sessions {
687 match append_and_broadcast(&state, &session, activity.clone()).await {
688 Ok(count) => posted += count,
689 Err(WebChatError::BadRequest(_)) => {
690 skipped += 1;
691 continue;
692 }
693 Err(other) => return Err(other),
694 }
695 }
696
697 Ok(Json(AdminPostActivityResponse { posted, skipped }))
698}
699
700async fn append_and_broadcast(
701 state: &AppState,
702 session: &WebchatSession,
703 activity_json: Value,
704) -> Result<usize, WebChatError> {
705 if !session.proactive_ok {
706 return Err(WebChatError::BadRequest("proactive messaging disabled"));
707 }
708 let mut activity: Activity = serde_json::from_value(activity_json)
709 .map_err(|_| WebChatError::BadRequest("activity must match Bot Framework schema"))?;
710 apply_bot_defaults(&mut activity, &session.conversation_id);
711
712 let stored = match state
713 .conversations
714 .append(&session.conversation_id, activity.clone())
715 .await
716 {
717 Ok(stored) => stored,
718 Err(StoreError::NotFound(_)) => {
719 state
720 .conversations
721 .create(&session.conversation_id, session.tenant_ctx.clone())
722 .await
723 .map_err(|err| WebChatError::Internal(err.into()))?;
724 state
725 .conversations
726 .append(&session.conversation_id, activity)
727 .await
728 .map_err(|err| WebChatError::Internal(err.into()))?
729 }
730 Err(StoreError::QuotaExceeded(_)) => {
731 return Err(WebChatError::BadRequest(
732 "conversation backlog quota exceeded",
733 ));
734 }
735 Err(err) => return Err(WebChatError::Internal(err.into())),
736 };
737
738 if let Err(err) = state
739 .sessions
740 .update_watermark(
741 &session.conversation_id,
742 Some((stored.watermark + 1).to_string()),
743 )
744 .await
745 {
746 warn!(error = %err, "failed to update watermark");
747 }
748
749 Ok(1)
750}
751
752fn apply_bot_defaults(activity: &mut Activity, conversation_id: &str) {
753 match activity.from.as_mut() {
754 Some(from) => {
755 if from.id.trim().is_empty() {
756 from.id = "bot".into();
757 }
758 from.role = Some("bot".into());
759 }
760 None => {
761 activity.from = Some(ChannelAccount {
762 id: "bot".into(),
763 name: None,
764 role: Some("bot".into()),
765 });
766 }
767 }
768 if activity
769 .conversation
770 .as_ref()
771 .map(|conv| conv.id.trim().is_empty())
772 .unwrap_or(true)
773 {
774 activity.conversation = Some(ConversationAccount {
775 id: conversation_id.to_string(),
776 });
777 }
778}
779
780pub type SharedDirectLinePoster = Arc<dyn DirectLinePoster>;
781
782#[async_trait]
783pub trait DirectLinePoster: Send + Sync {
784 async fn post_activity(
785 &self,
786 base_url: &str,
787 conversation_id: &str,
788 bearer_token: &str,
789 activity: Value,
790 ) -> Result<(), DirectLineError>;
791}
792
793pub struct HttpDirectLinePoster {
794 client: Client,
795}
796
797impl HttpDirectLinePoster {
798 pub fn new(client: Client) -> Self {
799 Self { client }
800 }
801}
802
803#[async_trait]
804impl DirectLinePoster for HttpDirectLinePoster {
805 async fn post_activity(
806 &self,
807 base_url: &str,
808 conversation_id: &str,
809 bearer_token: &str,
810 activity: Value,
811 ) -> Result<(), DirectLineError> {
812 let url = format!(
813 "{}/conversations/{}/activities",
814 base_url.trim_end_matches('/'),
815 conversation_id
816 );
817
818 let response = self
819 .client
820 .post(url)
821 .bearer_auth(bearer_token)
822 .json(&activity)
823 .send()
824 .await
825 .map_err(DirectLineError::Transport)?;
826
827 let status = response.status();
828 if status.is_success() {
829 return Ok(());
830 }
831
832 let retry_after = response
833 .headers()
834 .get(axum::http::header::RETRY_AFTER)
835 .and_then(|value| value.to_str().ok())
836 .and_then(|value| value.parse::<u64>().ok())
837 .map(std::time::Duration::from_secs);
838 let body = response
839 .text()
840 .await
841 .unwrap_or_else(|_| "<unreadable>".to_string());
842 let message = if body.len() > 512 {
843 body[..512].to_string()
844 } else {
845 body
846 };
847
848 Err(DirectLineError::Remote {
849 status,
850 retry_after,
851 message,
852 })
853 }
854}
855
856#[cfg(test)]
857mod tests {
858 use super::*;
859
860 #[tokio::test]
861 async fn healthz_returns_no_content() {
862 assert_eq!(healthz().await, StatusCode::NO_CONTENT);
863 }
864}