Skip to main content

systemprompt_api/services/gateway/audit/
mod.rs

1//! Persistence of gateway request lifecycle to the AI-request audit trail.
2//!
3//! [`GatewayAudit`] opens a record when a request arrives (see the `open`
4//! submodule), records the canonical messages and request payload, then closes
5//! it on completion with token usage, resolved cost, latency, captured tool
6//! calls, and the response payload (see the `complete` submodule) — or marks it
7//! failed. [`GatewayRequestContext`] carries the identifiers and routing
8//! metadata bound to a single request.
9
10mod complete;
11mod message_text;
12mod open;
13pub mod payload;
14
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17
18use anyhow::Result;
19use systemprompt_ai::repository::{AiRequestPayloadRepository, AiRequestRepository};
20use systemprompt_database::DbPool;
21use systemprompt_identifiers::{
22    AiRequestId, ContextId, GatewayConversationId, SessionId, TraceId, UserId,
23};
24
25#[derive(Debug, Clone)]
26pub struct GatewayRequestContext {
27    pub ai_request_id: AiRequestId,
28    pub user_id: UserId,
29    pub session_id: Option<SessionId>,
30    pub context_id: ContextId,
31    pub gateway_conversation_id: Option<GatewayConversationId>,
32    pub trace_id: Option<TraceId>,
33    pub provider: String,
34    pub model: String,
35    pub max_tokens: Option<u32>,
36    pub is_streaming: bool,
37    pub wire_protocol: String,
38}
39
40#[expect(
41    missing_debug_implementations,
42    reason = "service type holds repository clients that intentionally do not implement Debug"
43)]
44pub struct GatewayAudit {
45    requests: Arc<AiRequestRepository>,
46    payloads: Arc<AiRequestPayloadRepository>,
47    pub ctx: GatewayRequestContext,
48    served_model: Mutex<Option<String>>,
49    started_at: Instant,
50}
51
52impl GatewayAudit {
53    pub fn new(
54        db: &DbPool,
55        ctx: GatewayRequestContext,
56    ) -> Result<Self, systemprompt_ai::error::RepositoryError> {
57        let requests = Arc::new(AiRequestRepository::new(db)?);
58        let payloads = Arc::new(AiRequestPayloadRepository::new(db)?);
59        Ok(Self {
60            requests,
61            payloads,
62            ctx,
63            served_model: Mutex::new(None),
64            started_at: Instant::now(),
65        })
66    }
67
68    pub async fn set_served_model(&self, model: &str) {
69        if model.is_empty() || model == self.ctx.model {
70            return;
71        }
72        if let Ok(mut slot) = self.served_model.lock() {
73            *slot = Some(model.to_owned());
74        }
75        if let Err(e) = self
76            .requests
77            .update_model(&self.ctx.ai_request_id, model)
78            .await
79        {
80            tracing::warn!(error = %e, "update_model failed");
81        }
82    }
83
84    pub async fn fail(&self, error: &str) -> Result<()> {
85        if let Err(e) = self
86            .requests
87            .update_error(&self.ctx.ai_request_id, error)
88            .await
89        {
90            tracing::warn!(error = %e, "audit fail update failed");
91        }
92        tracing::info!(
93            ai_request_id = %self.ctx.ai_request_id,
94            user_id = %self.ctx.user_id,
95            provider = %self.ctx.provider,
96            model = %self.ctx.model,
97            error,
98            "Gateway audit: request failed"
99        );
100        Ok(())
101    }
102}