Skip to main content

systemprompt_api/services/gateway/audit/
open.rs

1//! Opening a gateway audit record: insert the request row, its payload, and the
2//! canonical request messages.
3
4use anyhow::Result;
5use bytes::Bytes;
6use systemprompt_ai::models::ai_request_record::AiRequestRecord;
7use systemprompt_ai::repository::UpsertPayloadParams;
8
9use super::GatewayAudit;
10use super::message_text::flatten_message_content;
11use super::payload::slice_payload;
12use crate::services::gateway::protocol::canonical::{CanonicalRequest, Role};
13
14impl GatewayAudit {
15    fn build_record(&self) -> Result<AiRequestRecord> {
16        let mut record =
17            AiRequestRecord::builder(self.ctx.ai_request_id.clone(), self.ctx.user_id.clone())
18                .provider(self.ctx.provider.clone())
19                .model(self.ctx.model.clone())
20                .streaming(self.ctx.is_streaming);
21        if let Some(s) = &self.ctx.session_id {
22            record = record.session_id(s.clone());
23        }
24        if let Some(rm) = &self.ctx.requested_model {
25            record = record.requested_model(rm.clone());
26        }
27        record = record.context_id(self.ctx.context_id.clone());
28        if let Some(g) = &self.ctx.gateway_conversation_id {
29            record = record.gateway_conversation_id(g.clone());
30        }
31        if let Some(t) = &self.ctx.trace_id {
32            record = record.trace_id(t.clone());
33        }
34        if let Some(mt) = self.ctx.max_tokens {
35            record = record.max_tokens(mt);
36        }
37        record.build().map_err(anyhow::Error::from)
38    }
39
40    pub async fn open(&self, request: &CanonicalRequest, request_body: &Bytes) -> Result<()> {
41        let record = self.build_record()?;
42
43        self.requests
44            .insert_with_id(&self.ctx.ai_request_id, &record)
45            .await?;
46
47        let (body_json, excerpt, truncated, bytes) = slice_payload(request_body);
48        if let Err(e) = self
49            .payloads
50            .upsert_request(
51                &self.ctx.ai_request_id,
52                UpsertPayloadParams {
53                    body: body_json.as_ref(),
54                    excerpt: excerpt.as_deref(),
55                    truncated,
56                    bytes: Some(bytes),
57                },
58            )
59            .await
60        {
61            tracing::warn!(error = %e, ai_request_id = %self.ctx.ai_request_id, "payload insert (request) failed");
62        }
63
64        self.persist_request_messages(request).await;
65        Ok(())
66    }
67
68    async fn persist_request_messages(&self, request: &CanonicalRequest) {
69        let mut seq = 0i32;
70        if let Some(system) = &request.system {
71            if !system.is_empty() {
72                if let Err(e) = self
73                    .requests
74                    .insert_message(&self.ctx.ai_request_id, "system", system, seq)
75                    .await
76                {
77                    tracing::warn!(error = %e, "insert system message failed");
78                }
79                seq += 1;
80            }
81        }
82        for msg in &request.messages {
83            let role = match msg.role {
84                Role::System => "system",
85                Role::User => "user",
86                Role::Assistant => "assistant",
87                Role::Tool => "tool",
88            };
89            let text = flatten_message_content(&msg.content);
90            if let Err(e) = self
91                .requests
92                .insert_message(&self.ctx.ai_request_id, role, &text, seq)
93                .await
94            {
95                tracing::warn!(error = %e, seq, "insert message failed");
96            }
97            seq += 1;
98        }
99    }
100}