systemprompt_api/services/gateway/audit/
open.rs1use anyhow::Result;
5use bytes::Bytes;
6use systemprompt_ai::models::ai_request_record::AiRequestRecord;
7use systemprompt_ai::repository::UpsertPayloadParams;
8
9use super::GatewayAudit;
10use super::message_text::flatten_message_content;
11use super::payload::slice_payload;
12use crate::services::gateway::protocol::canonical::{CanonicalRequest, Role};
13
14impl GatewayAudit {
15 fn build_record(&self) -> Result<AiRequestRecord> {
16 let mut record =
17 AiRequestRecord::builder(self.ctx.ai_request_id.clone(), self.ctx.user_id.clone())
18 .provider(self.ctx.provider.clone())
19 .model(self.ctx.model.clone())
20 .streaming(self.ctx.is_streaming);
21 if let Some(s) = &self.ctx.session_id {
22 record = record.session_id(s.clone());
23 }
24 if let Some(rm) = &self.ctx.requested_model {
25 record = record.requested_model(rm.clone());
26 }
27 record = record.context_id(self.ctx.context_id.clone());
28 if let Some(g) = &self.ctx.gateway_conversation_id {
29 record = record.gateway_conversation_id(g.clone());
30 }
31 if let Some(t) = &self.ctx.trace_id {
32 record = record.trace_id(t.clone());
33 }
34 if let Some(mt) = self.ctx.max_tokens {
35 record = record.max_tokens(mt);
36 }
37 record.build().map_err(anyhow::Error::from)
38 }
39
40 pub async fn open(&self, request: &CanonicalRequest, request_body: &Bytes) -> Result<()> {
41 let record = self.build_record()?;
42
43 self.requests
44 .insert_with_id(&self.ctx.ai_request_id, &record)
45 .await?;
46
47 let (body_json, excerpt, truncated, bytes) = slice_payload(request_body);
48 if let Err(e) = self
49 .payloads
50 .upsert_request(
51 &self.ctx.ai_request_id,
52 UpsertPayloadParams {
53 body: body_json.as_ref(),
54 excerpt: excerpt.as_deref(),
55 truncated,
56 bytes: Some(bytes),
57 },
58 )
59 .await
60 {
61 tracing::warn!(error = %e, ai_request_id = %self.ctx.ai_request_id, "payload insert (request) failed");
62 }
63
64 self.persist_request_messages(request).await;
65 Ok(())
66 }
67
68 async fn persist_request_messages(&self, request: &CanonicalRequest) {
69 let mut seq = 0i32;
70 if let Some(system) = &request.system {
71 if !system.is_empty() {
72 if let Err(e) = self
73 .requests
74 .insert_message(&self.ctx.ai_request_id, "system", system, seq)
75 .await
76 {
77 tracing::warn!(error = %e, "insert system message failed");
78 }
79 seq += 1;
80 }
81 }
82 for msg in &request.messages {
83 let role = match msg.role {
84 Role::System => "system",
85 Role::User => "user",
86 Role::Assistant => "assistant",
87 Role::Tool => "tool",
88 };
89 let text = flatten_message_content(&msg.content);
90 if let Err(e) = self
91 .requests
92 .insert_message(&self.ctx.ai_request_id, role, &text, seq)
93 .await
94 {
95 tracing::warn!(error = %e, seq, "insert message failed");
96 }
97 seq += 1;
98 }
99 }
100}