Skip to main content

systemprompt_api/services/gateway/audit/
complete.rs

1//! Closing a gateway audit record: completion metrics, tool calls, and the
2//! response payload.
3
4use anyhow::Result;
5use bytes::Bytes;
6use systemprompt_ai::repository::ai_requests::UpdateCompletionParams;
7use systemprompt_ai::repository::{InsertToolCallParams, UpsertPayloadParams};
8
9use super::GatewayAudit;
10use super::payload::{slice_payload, truncate_for_tool_input};
11use crate::services::gateway::captures::{CapturedToolUse, CapturedUsage};
12use crate::services::gateway::pricing;
13use crate::services::gateway::protocol::canonical_response::CanonicalResponse;
14
15impl GatewayAudit {
16    fn effective_model(&self) -> String {
17        self.served_model
18            .lock()
19            .map_err(|e| {
20                tracing::warn!(error = %e, "served_model mutex poisoned");
21                e
22            })
23            .ok()
24            .and_then(|s| s.clone())
25            .unwrap_or_else(|| self.ctx.model.clone())
26    }
27
28    pub async fn complete(
29        &self,
30        usage: CapturedUsage,
31        tool_calls: Vec<CapturedToolUse>,
32        response: &CanonicalResponse,
33        response_body: &Bytes,
34    ) -> Result<()> {
35        let latency_ms = self.started_at.elapsed().as_millis().min(i32::MAX as u128) as i32;
36        let effective_model = self.effective_model();
37        let profile = systemprompt_config::ProfileBootstrap::get().ok();
38        let gateway = profile
39            .as_ref()
40            .and_then(|p| p.gateway.as_ref())
41            .and_then(systemprompt_models::profile::GatewayState::resolved);
42        let empty_registry = systemprompt_models::profile::ProviderRegistry::default();
43        let registry = profile.as_ref().map_or(&empty_registry, |p| &p.providers);
44        let pricing_rates =
45            pricing::resolve(&self.ctx.provider, &effective_model, gateway, registry);
46        let cost =
47            pricing::cost_microdollars(pricing_rates, usage.input_tokens, usage.output_tokens);
48
49        self.requests
50            .update_completion(UpdateCompletionParams {
51                id: self.ctx.ai_request_id.clone(),
52                tokens_used: (usage.input_tokens + usage.output_tokens) as i32,
53                input_tokens: usage.input_tokens as i32,
54                output_tokens: usage.output_tokens as i32,
55                cost_microdollars: cost,
56                latency_ms,
57            })
58            .await?;
59
60        self.persist_tool_calls(&tool_calls).await;
61        self.persist_response(response, response_body).await;
62
63        tracing::info!(
64            ai_request_id = %self.ctx.ai_request_id,
65            user_id = %self.ctx.user_id,
66            provider = %self.ctx.provider,
67            model = %effective_model,
68            wire_protocol = %self.ctx.wire_protocol,
69            input_tokens = usage.input_tokens,
70            output_tokens = usage.output_tokens,
71            cost_microdollars = cost,
72            latency_ms,
73            tool_calls = tool_calls.len(),
74            "Gateway audit: request completed"
75        );
76        Ok(())
77    }
78
79    async fn persist_response(&self, response: &CanonicalResponse, response_body: &Bytes) {
80        let (body_json, excerpt, truncated, bytes) = slice_payload(response_body);
81        if let Err(e) = self
82            .payloads
83            .upsert_response(
84                &self.ctx.ai_request_id,
85                UpsertPayloadParams {
86                    body: body_json.as_ref(),
87                    excerpt: excerpt.as_deref(),
88                    truncated,
89                    bytes: Some(bytes),
90                },
91            )
92            .await
93        {
94            tracing::warn!(error = %e, ai_request_id = %self.ctx.ai_request_id, "payload insert (response) failed");
95        }
96
97        if let Some(assistant_text) = super::super::parse::extract_assistant_text(response) {
98            if let Err(e) = self
99                .requests
100                .add_response_message(&self.ctx.ai_request_id, &assistant_text)
101                .await
102            {
103                tracing::warn!(error = %e, "assistant response message insert failed");
104            }
105        }
106    }
107
108    async fn persist_tool_calls(&self, tool_calls: &[CapturedToolUse]) {
109        for (idx, tool) in tool_calls.iter().enumerate() {
110            let seq = idx as i32 + 1;
111            let trimmed = truncate_for_tool_input(&tool.tool_input);
112            if let Err(e) = self
113                .requests
114                .insert_tool_call(InsertToolCallParams {
115                    request_id: &self.ctx.ai_request_id,
116                    ai_tool_call_id: tool.ai_tool_call_id.as_str(),
117                    tool_name: &tool.tool_name,
118                    tool_input: &trimmed,
119                    sequence_number: seq,
120                })
121                .await
122            {
123                tracing::warn!(error = %e, seq, "tool_call insert failed");
124            }
125        }
126    }
127}