Skip to main content

systemprompt_api/services/gateway/audit/
mod.rs

1//! Persistence of gateway request lifecycle to the AI-request audit trail.
2//!
3//! [`GatewayAudit`] opens a record when a request arrives (see the `open`
4//! submodule), records the canonical messages and request payload, then closes
5//! it on completion with token usage, resolved cost, latency, captured tool
6//! calls, and the response payload (see the `complete` submodule) — or marks it
7//! failed. [`GatewayRequestContext`] carries the identifiers and routing
8//! metadata bound to a single request.
9
10mod complete;
11mod message_text;
12mod open;
13pub mod payload;
14
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17
18use anyhow::Result;
19use systemprompt_ai::repository::{AiRequestPayloadRepository, AiRequestRepository};
20use systemprompt_database::DbPool;
21use systemprompt_identifiers::{
22    AiRequestId, ContextId, GatewayConversationId, SessionId, TraceId, UserId,
23};
24
25#[derive(Debug, Clone)]
26pub struct GatewayRequestContext {
27    pub ai_request_id: AiRequestId,
28    pub user_id: UserId,
29    pub session_id: Option<SessionId>,
30    pub context_id: ContextId,
31    pub gateway_conversation_id: Option<GatewayConversationId>,
32    pub trace_id: Option<TraceId>,
33    pub provider: String,
34    /// The upstream model the request dispatches to (after route rewrite). The
35    /// audit `model` column is opened from this, then overwritten by
36    /// `set_served_model` with the model the provider echoes back.
37    pub model: String,
38    /// The model the client requested on the wire, before route rewrite.
39    /// Persisted to `ai_requests.requested_model` so an audit retains both.
40    pub requested_model: Option<String>,
41    pub max_tokens: Option<u32>,
42    pub is_streaming: bool,
43    pub wire_protocol: String,
44}
45
46#[expect(
47    missing_debug_implementations,
48    reason = "service type holds repository clients that intentionally do not implement Debug"
49)]
50pub struct GatewayAudit {
51    requests: Arc<AiRequestRepository>,
52    payloads: Arc<AiRequestPayloadRepository>,
53    pub ctx: GatewayRequestContext,
54    served_model: Mutex<Option<String>>,
55    started_at: Instant,
56}
57
58impl GatewayAudit {
59    pub fn new(
60        db: &DbPool,
61        ctx: GatewayRequestContext,
62    ) -> Result<Self, systemprompt_ai::error::RepositoryError> {
63        let requests = Arc::new(AiRequestRepository::new(db)?);
64        let payloads = Arc::new(AiRequestPayloadRepository::new(db)?);
65        Ok(Self {
66            requests,
67            payloads,
68            ctx,
69            served_model: Mutex::new(None),
70            started_at: Instant::now(),
71        })
72    }
73
74    pub async fn set_served_model(&self, model: &str) {
75        if model.is_empty() || model == self.ctx.model {
76            return;
77        }
78        if let Ok(mut slot) = self.served_model.lock() {
79            *slot = Some(model.to_owned());
80        }
81        if let Err(e) = self
82            .requests
83            .update_model(&self.ctx.ai_request_id, model)
84            .await
85        {
86            tracing::warn!(error = %e, "update_model failed");
87        }
88    }
89
90    pub async fn fail(&self, error: &str) -> Result<()> {
91        if let Err(e) = self
92            .requests
93            .update_error(&self.ctx.ai_request_id, error)
94            .await
95        {
96            tracing::warn!(error = %e, "audit fail update failed");
97        }
98        tracing::info!(
99            ai_request_id = %self.ctx.ai_request_id,
100            user_id = %self.ctx.user_id,
101            provider = %self.ctx.provider,
102            model = %self.ctx.model,
103            error,
104            "Gateway audit: request failed"
105        );
106        Ok(())
107    }
108}