systemprompt_api/services/gateway/audit/
complete.rs1use anyhow::Result;
5use bytes::Bytes;
6use systemprompt_ai::repository::ai_requests::UpdateCompletionParams;
7use systemprompt_ai::repository::{InsertToolCallParams, UpsertPayloadParams};
8
9use super::GatewayAudit;
10use super::payload::{slice_payload, truncate_for_tool_input};
11use crate::services::gateway::captures::{CapturedToolUse, CapturedUsage};
12use crate::services::gateway::pricing;
13use crate::services::gateway::protocol::canonical_response::CanonicalResponse;
14
15impl GatewayAudit {
16 fn effective_model(&self) -> String {
17 self.served_model
18 .lock()
19 .map_err(|e| {
20 tracing::warn!(error = %e, "served_model mutex poisoned");
21 e
22 })
23 .ok()
24 .and_then(|s| s.clone())
25 .unwrap_or_else(|| self.ctx.model.clone())
26 }
27
28 pub async fn complete(
29 &self,
30 usage: CapturedUsage,
31 tool_calls: Vec<CapturedToolUse>,
32 response: &CanonicalResponse,
33 response_body: &Bytes,
34 ) -> Result<()> {
35 let latency_ms = self.started_at.elapsed().as_millis().min(i32::MAX as u128) as i32;
36 let effective_model = self.effective_model();
37 let profile = systemprompt_config::ProfileBootstrap::get().ok();
38 let gateway = profile
39 .as_ref()
40 .and_then(|p| p.gateway.as_ref())
41 .and_then(systemprompt_models::profile::GatewayState::resolved);
42 let empty_registry = systemprompt_models::profile::ProviderRegistry::default();
43 let registry = profile.as_ref().map_or(&empty_registry, |p| &p.providers);
44 let pricing_rates =
45 pricing::resolve(&self.ctx.provider, &effective_model, gateway, registry);
46 let cost =
47 pricing::cost_microdollars(pricing_rates, usage.input_tokens, usage.output_tokens);
48
49 self.requests
50 .update_completion(UpdateCompletionParams {
51 id: self.ctx.ai_request_id.clone(),
52 tokens_used: (usage.input_tokens + usage.output_tokens) as i32,
53 input_tokens: usage.input_tokens as i32,
54 output_tokens: usage.output_tokens as i32,
55 cost_microdollars: cost,
56 latency_ms,
57 })
58 .await?;
59
60 self.persist_tool_calls(&tool_calls).await;
61 self.persist_response(response, response_body).await;
62
63 tracing::info!(
64 ai_request_id = %self.ctx.ai_request_id,
65 user_id = %self.ctx.user_id,
66 provider = %self.ctx.provider,
67 model = %effective_model,
68 wire_protocol = %self.ctx.wire_protocol,
69 input_tokens = usage.input_tokens,
70 output_tokens = usage.output_tokens,
71 cost_microdollars = cost,
72 latency_ms,
73 tool_calls = tool_calls.len(),
74 "Gateway audit: request completed"
75 );
76 Ok(())
77 }
78
79 async fn persist_response(&self, response: &CanonicalResponse, response_body: &Bytes) {
80 let (body_json, excerpt, truncated, bytes) = slice_payload(response_body);
81 if let Err(e) = self
82 .payloads
83 .upsert_response(
84 &self.ctx.ai_request_id,
85 UpsertPayloadParams {
86 body: body_json.as_ref(),
87 excerpt: excerpt.as_deref(),
88 truncated,
89 bytes: Some(bytes),
90 },
91 )
92 .await
93 {
94 tracing::warn!(error = %e, ai_request_id = %self.ctx.ai_request_id, "payload insert (response) failed");
95 }
96
97 if let Some(assistant_text) = super::super::parse::extract_assistant_text(response) {
98 if let Err(e) = self
99 .requests
100 .add_response_message(&self.ctx.ai_request_id, &assistant_text)
101 .await
102 {
103 tracing::warn!(error = %e, "assistant response message insert failed");
104 }
105 }
106 }
107
108 async fn persist_tool_calls(&self, tool_calls: &[CapturedToolUse]) {
109 for (idx, tool) in tool_calls.iter().enumerate() {
110 let seq = idx as i32 + 1;
111 let trimmed = truncate_for_tool_input(&tool.tool_input);
112 if let Err(e) = self
113 .requests
114 .insert_tool_call(InsertToolCallParams {
115 request_id: &self.ctx.ai_request_id,
116 ai_tool_call_id: tool.ai_tool_call_id.as_str(),
117 tool_name: &tool.tool_name,
118 tool_input: &trimmed,
119 sequence_number: seq,
120 })
121 .await
122 {
123 tracing::warn!(error = %e, seq, "tool_call insert failed");
124 }
125 }
126 }
127}