defect_obs/langfuse/model.rs
1//! Wire types for the Langfuse ingestion API.
2//!
3//! Corresponds to `POST /api/public/ingestion`. The request body is `{ "batch": [
4//! <event>, ... ] }`,
5//! where each event is an envelope: `{ id, type, timestamp, body }`.
6//!
7//! - The envelope `id` is used for **deduplication** (unique per request).
8//! - `body.id` is the real trace / observation id (same id with different envelope id =
9//! update).
10//! - All fields use camelCase (`#[serde(rename_all = "camelCase")]`).
11//!
12//! Data model mirrors
13//! <https://langfuse.com/docs/observability/data-model>.
14//!
15//! We only cover the event types and fields needed for ingestion; the full Langfuse
16//! schema is wider.
17//! Unused fields are omitted (`skip_serializing_if = "Option::is_none"`).
18
19use serde::{Deserialize, Serialize};
20
21/// The body of an ingestion request.
22#[derive(Debug, Clone, Serialize)]
23pub struct IngestionBatch {
24 pub batch: Vec<IngestionEvent>,
25}
26
27/// Response body for the ingestion endpoint.
28///
29/// **Note**: This endpoint **always returns 207 Multi-Status** for batch requests (even
30/// if every individual item succeeds); per-item results are split between `successes`
31/// (each with its own HTTP status, 201 on success) and `errors`. Therefore, whether an
32/// error actually occurred can only be determined by checking whether `errors` is
33/// non-empty — the 207 status code **cannot** be used for that purpose.
34#[derive(Debug, Clone, Deserialize)]
35pub struct IngestionResponse {
36 #[serde(default)]
37 pub successes: Vec<IngestionSuccess>,
38 #[serde(default)]
39 pub errors: Vec<IngestionError>,
40}
41
42/// A single successful ingestion result.
43#[derive(Debug, Clone, Deserialize)]
44pub struct IngestionSuccess {
45 pub id: String,
46 pub status: u16,
47}
48
49/// A single failure result. Fields are intentionally lenient — only used for diagnostic
50/// logging; unknown fields are ignored.
51#[derive(Debug, Clone, Deserialize)]
52pub struct IngestionError {
53 pub id: String,
54 pub status: u16,
55 #[serde(default)]
56 pub message: Option<String>,
57}
58
59/// A single event envelope. `type` is the oneOf discriminator; `body` varies by type.
60///
61/// Uses a flat `kind` (mapped to `"type"`) + generic `body: serde_json::Value` instead of
62/// an enum variant per type: the projector constructs the body JSON as needed, and the
63/// model layer only wraps the envelope. This way, adding new fields does not require
64/// changes to the model layer.
65#[derive(Debug, Clone, Serialize)]
66pub struct IngestionEvent {
67 /// Envelope ID, unique per request — Langfuse uses it for deduplication.
68 pub id: String,
69 /// A discriminant string for the event type, e.g. `trace-create` /
70 /// `generation-create`.
71 #[serde(rename = "type")]
72 pub kind: EventKind,
73 /// Timestamp when the event was produced (ISO-8601 / RFC3339).
74 pub timestamp: String,
75 /// Type-specific payload. `body.id` is the trace or observation id being operated on.
76 pub body: serde_json::Value,
77}
78
79/// Discriminant for ingestion event types.
80///
81/// Values follow the public Langfuse ingestion contract. `-create` and `-update` share
82/// the same body shape (same `body.id` = upsert/merge), which we rely on to implement
83/// "create first, fill in endTime later".
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
85#[serde(rename_all = "kebab-case")]
86pub enum EventKind {
87 TraceCreate,
88 GenerationCreate,
89 GenerationUpdate,
90 SpanCreate,
91 SpanUpdate,
92 EventCreate,
93}
94
95/// The level of an observation, determining UI state coloring and filtering.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
97#[serde(rename_all = "UPPERCASE")]
98pub enum ObservationLevel {
99 Debug,
100 Default,
101 Warning,
102 Error,
103}
104
105/// Body for a trace (shared by `trace-create` and update).
106#[derive(Debug, Clone, Default, Serialize)]
107#[serde(rename_all = "camelCase")]
108pub struct TraceBody {
109 /// Trace ID (our turn-level UUID). Sending the same ID again acts as an update.
110 pub id: String,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub name: Option<String>,
113 /// Links together multiple turn-traces belonging to the same defect session.
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub session_id: Option<String>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub input: Option<serde_json::Value>,
118 #[serde(skip_serializing_if = "Option::is_none")]
119 pub output: Option<serde_json::Value>,
120 #[serde(skip_serializing_if = "Option::is_none")]
121 pub metadata: Option<serde_json::Value>,
122 /// Deployment environment (e.g. `production`).
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub environment: Option<String>,
125 /// Trace start time (RFC3339).
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub timestamp: Option<String>,
128}
129
130/// Body of an observation (generation / span).
131///
132/// Generation and span share the same set of body fields — they differ only in the event
133/// `type` and whether `model` / `usageDetails` are present (span usually does not carry
134/// them).
135#[derive(Debug, Clone, Default, Serialize)]
136#[serde(rename_all = "camelCase")]
137pub struct ObservationBody {
138 /// Observation id. Sending the same id twice results in a merge (first create, then
139 /// update endTime).
140 pub id: String,
141 /// The trace ID this observation belongs to.
142 pub trace_id: String,
143 /// Parent observation; `None` means it is directly attached to the trace.
144 #[serde(skip_serializing_if = "Option::is_none")]
145 pub parent_observation_id: Option<String>,
146 #[serde(skip_serializing_if = "Option::is_none")]
147 pub name: Option<String>,
148 #[serde(skip_serializing_if = "Option::is_none")]
149 pub start_time: Option<String>,
150 #[serde(skip_serializing_if = "Option::is_none")]
151 pub end_time: Option<String>,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 pub input: Option<serde_json::Value>,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub output: Option<serde_json::Value>,
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub metadata: Option<serde_json::Value>,
158 #[serde(skip_serializing_if = "Option::is_none")]
159 pub level: Option<ObservationLevel>,
160 #[serde(skip_serializing_if = "Option::is_none")]
161 pub status_message: Option<String>,
162 #[serde(skip_serializing_if = "Option::is_none")]
163 pub environment: Option<String>,
164 // ---- generation-specific ----
165 /// The model name (generation only).
166 #[serde(skip_serializing_if = "Option::is_none")]
167 pub model: Option<String>,
168 /// Free-form token usage details: keys are arbitrary; if `total` is omitted, the
169 /// backend infers it.
170 /// We populate `input` / `output` / `cache_read_input_tokens` /
171 /// `cache_creation_input_tokens` (aligned with [`defect_agent::llm::Usage`]).
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub usage_details: Option<serde_json::Map<String, serde_json::Value>>,
174}
175
176impl IngestionEvent {
177 /// Wraps a trace event.
178 pub fn trace(
179 envelope_id: String,
180 timestamp: String,
181 kind: EventKind,
182 body: &TraceBody,
183 ) -> Self {
184 Self {
185 id: envelope_id,
186 kind,
187 timestamp,
188 body: serde_json::to_value(body).unwrap_or(serde_json::Value::Null),
189 }
190 }
191
192 /// Wrap an observation event (generation / span / event).
193 pub fn observation(
194 envelope_id: String,
195 timestamp: String,
196 kind: EventKind,
197 body: &ObservationBody,
198 ) -> Self {
199 Self {
200 id: envelope_id,
201 kind,
202 timestamp,
203 body: serde_json::to_value(body).unwrap_or(serde_json::Value::Null),
204 }
205 }
206}