Skip to main content

defect_obs/langfuse/
model.rs

1//! Wire types for the Langfuse ingestion API.
2//!
3//! Corresponds to `POST /api/public/ingestion`. The request body is `{ "batch": [
4//! <event>, ... ] }`,
5//! where each event is an envelope: `{ id, type, timestamp, body }`.
6//!
7//! - The envelope `id` is used for **deduplication** (unique per request).
8//! - `body.id` is the real trace / observation id (same id with different envelope id =
9//!   update).
10//! - All fields use camelCase (`#[serde(rename_all = "camelCase")]`).
11//!
12//! Data model mirrors
13//! <https://langfuse.com/docs/observability/data-model>.
14//!
15//! We only cover the event types and fields needed for ingestion; the full Langfuse
16//! schema is wider.
17//! Unused fields are omitted (`skip_serializing_if = "Option::is_none"`).
18
19use serde::{Deserialize, Serialize};
20
21/// The body of an ingestion request.
22#[derive(Debug, Clone, Serialize)]
23pub struct IngestionBatch {
24    pub batch: Vec<IngestionEvent>,
25}
26
27/// Response body for the ingestion endpoint.
28///
29/// **Note**: This endpoint **always returns 207 Multi-Status** for batch requests (even
30/// if every individual item succeeds); per-item results are split between `successes`
31/// (each with its own HTTP status, 201 on success) and `errors`. Therefore, whether an
32/// error actually occurred can only be determined by checking whether `errors` is
33/// non-empty — the 207 status code **cannot** be used for that purpose.
34#[derive(Debug, Clone, Deserialize)]
35pub struct IngestionResponse {
36    #[serde(default)]
37    pub successes: Vec<IngestionSuccess>,
38    #[serde(default)]
39    pub errors: Vec<IngestionError>,
40}
41
42/// A single successful ingestion result.
43#[derive(Debug, Clone, Deserialize)]
44pub struct IngestionSuccess {
45    pub id: String,
46    pub status: u16,
47}
48
49/// A single failure result. Fields are intentionally lenient — only used for diagnostic
50/// logging; unknown fields are ignored.
51#[derive(Debug, Clone, Deserialize)]
52pub struct IngestionError {
53    pub id: String,
54    pub status: u16,
55    #[serde(default)]
56    pub message: Option<String>,
57}
58
59/// A single event envelope. `type` is the oneOf discriminator; `body` varies by type.
60///
61/// Uses a flat `kind` (mapped to `"type"`) + generic `body: serde_json::Value` instead of
62/// an enum variant per type: the projector constructs the body JSON as needed, and the
63/// model layer only wraps the envelope. This way, adding new fields does not require
64/// changes to the model layer.
65#[derive(Debug, Clone, Serialize)]
66pub struct IngestionEvent {
67    /// Envelope ID, unique per request — Langfuse uses it for deduplication.
68    pub id: String,
69    /// A discriminant string for the event type, e.g. `trace-create` /
70    /// `generation-create`.
71    #[serde(rename = "type")]
72    pub kind: EventKind,
73    /// Timestamp when the event was produced (ISO-8601 / RFC3339).
74    pub timestamp: String,
75    /// Type-specific payload. `body.id` is the trace or observation id being operated on.
76    pub body: serde_json::Value,
77}
78
79/// Discriminant for ingestion event types.
80///
81/// Values follow the public Langfuse ingestion contract. `-create` and `-update` share
82/// the same body shape (same `body.id` = upsert/merge), which we rely on to implement
83/// "create first, fill in endTime later".
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
85#[serde(rename_all = "kebab-case")]
86pub enum EventKind {
87    TraceCreate,
88    GenerationCreate,
89    GenerationUpdate,
90    SpanCreate,
91    SpanUpdate,
92    EventCreate,
93}
94
95/// The level of an observation, determining UI state coloring and filtering.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
97#[serde(rename_all = "UPPERCASE")]
98pub enum ObservationLevel {
99    Debug,
100    Default,
101    Warning,
102    Error,
103}
104
105/// Body for a trace (shared by `trace-create` and update).
106#[derive(Debug, Clone, Default, Serialize)]
107#[serde(rename_all = "camelCase")]
108pub struct TraceBody {
109    /// Trace ID (our turn-level UUID). Sending the same ID again acts as an update.
110    pub id: String,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub name: Option<String>,
113    /// Links together multiple turn-traces belonging to the same defect session.
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub session_id: Option<String>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub input: Option<serde_json::Value>,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub output: Option<serde_json::Value>,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub metadata: Option<serde_json::Value>,
122    /// Deployment environment (e.g. `production`).
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub environment: Option<String>,
125    /// Trace start time (RFC3339).
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub timestamp: Option<String>,
128}
129
130/// Body of an observation (generation / span).
131///
132/// Generation and span share the same set of body fields — they differ only in the event
133/// `type` and whether `model` / `usageDetails` are present (span usually does not carry
134/// them).
135#[derive(Debug, Clone, Default, Serialize)]
136#[serde(rename_all = "camelCase")]
137pub struct ObservationBody {
138    /// Observation id. Sending the same id twice results in a merge (first create, then
139    /// update endTime).
140    pub id: String,
141    /// The trace ID this observation belongs to.
142    pub trace_id: String,
143    /// Parent observation; `None` means it is directly attached to the trace.
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub parent_observation_id: Option<String>,
146    #[serde(skip_serializing_if = "Option::is_none")]
147    pub name: Option<String>,
148    #[serde(skip_serializing_if = "Option::is_none")]
149    pub start_time: Option<String>,
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub end_time: Option<String>,
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub input: Option<serde_json::Value>,
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub output: Option<serde_json::Value>,
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub metadata: Option<serde_json::Value>,
158    #[serde(skip_serializing_if = "Option::is_none")]
159    pub level: Option<ObservationLevel>,
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub status_message: Option<String>,
162    #[serde(skip_serializing_if = "Option::is_none")]
163    pub environment: Option<String>,
164    // ---- generation-specific ----
165    /// The model name (generation only).
166    #[serde(skip_serializing_if = "Option::is_none")]
167    pub model: Option<String>,
168    /// Free-form token usage details: keys are arbitrary; if `total` is omitted, the
169    /// backend infers it.
170    /// We populate `input` / `output` / `cache_read_input_tokens` /
171    /// `cache_creation_input_tokens` (aligned with [`defect_agent::llm::Usage`]).
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub usage_details: Option<serde_json::Map<String, serde_json::Value>>,
174}
175
176impl IngestionEvent {
177    /// Wraps a trace event.
178    pub fn trace(
179        envelope_id: String,
180        timestamp: String,
181        kind: EventKind,
182        body: &TraceBody,
183    ) -> Self {
184        Self {
185            id: envelope_id,
186            kind,
187            timestamp,
188            body: serde_json::to_value(body).unwrap_or(serde_json::Value::Null),
189        }
190    }
191
192    /// Wrap an observation event (generation / span / event).
193    pub fn observation(
194        envelope_id: String,
195        timestamp: String,
196        kind: EventKind,
197        body: &ObservationBody,
198    ) -> Self {
199        Self {
200            id: envelope_id,
201            kind,
202            timestamp,
203            body: serde_json::to_value(body).unwrap_or(serde_json::Value::Null),
204        }
205    }
206}