Skip to main content

lago_api/routes/
branches.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use axum::Json;
5use axum::extract::{Path, State};
6use serde::{Deserialize, Serialize};
7
8use lago_core::event::{EventEnvelope, EventPayload};
9use lago_core::id::{BranchId, EventId, SeqNo, SessionId};
10
11use crate::error::ApiError;
12use crate::state::AppState;
13
14// --- Request / Response types
15
16#[derive(Deserialize)]
17pub struct CreateBranchRequest {
18    pub name: String,
19    #[serde(default)]
20    pub fork_point_seq: Option<SeqNo>,
21}
22
23#[derive(Serialize)]
24pub struct BranchResponse {
25    pub branch_id: String,
26    pub name: String,
27    pub fork_point_seq: SeqNo,
28}
29
30// --- Handlers
31
32/// POST /v1/sessions/:id/branches
33///
34/// Creates a new branch forked from the session's "main" branch at the
35/// given sequence number (defaults to the current head).
36pub async fn create_branch(
37    State(state): State<Arc<AppState>>,
38    Path(session_id): Path<String>,
39    Json(body): Json<CreateBranchRequest>,
40) -> Result<(axum::http::StatusCode, Json<BranchResponse>), ApiError> {
41    let session_id = SessionId::from_string(session_id.clone());
42    let main_branch = BranchId::from_string("main");
43
44    // Verify the session exists
45    state
46        .journal
47        .get_session(&session_id)
48        .await?
49        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
50
51    // Determine fork point: use the provided seq or the current head
52    let fork_point_seq = match body.fork_point_seq {
53        Some(seq) => seq,
54        None => state.journal.head_seq(&session_id, &main_branch).await?,
55    };
56
57    let new_branch_id = BranchId::new();
58
59    // Emit a BranchCreated event on the main branch
60    let event = EventEnvelope {
61        event_id: EventId::new(),
62        session_id: session_id.clone(),
63        branch_id: main_branch.clone(),
64        run_id: None,
65        seq: 0, // Will be assigned by the journal
66        timestamp: EventEnvelope::now_micros(),
67        parent_id: None,
68        payload: EventPayload::BranchCreated {
69            new_branch_id: new_branch_id.clone().into(),
70            fork_point_seq,
71            name: body.name.clone(),
72        },
73        metadata: HashMap::new(),
74        schema_version: 1,
75    };
76
77    state.journal.append(event).await?;
78
79    Ok((
80        axum::http::StatusCode::CREATED,
81        Json(BranchResponse {
82            branch_id: new_branch_id.to_string(),
83            name: body.name,
84            fork_point_seq,
85        }),
86    ))
87}
88
89/// GET /v1/sessions/:id/branches
90///
91/// Lists all branches for a session. Currently reads BranchCreated events
92/// from the journal to reconstruct the branch list.
93pub async fn list_branches(
94    State(state): State<Arc<AppState>>,
95    Path(session_id): Path<String>,
96) -> Result<Json<Vec<BranchResponse>>, ApiError> {
97    let session_id = SessionId::from_string(session_id.clone());
98
99    // Verify the session exists
100    let _session = state
101        .journal
102        .get_session(&session_id)
103        .await?
104        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
105
106    // Read all events for this session to find BranchCreated events
107    let query = lago_core::EventQuery::new().session(session_id.clone());
108    let events = state.journal.read(query).await?;
109
110    let mut branches: Vec<BranchResponse> = Vec::new();
111
112    // The "main" branch always exists for a session
113    branches.push(BranchResponse {
114        branch_id: "main".to_string(),
115        name: "main".to_string(),
116        fork_point_seq: 0,
117    });
118
119    // Extract BranchCreated events
120    for event in &events {
121        if let EventPayload::BranchCreated {
122            ref new_branch_id,
123            fork_point_seq,
124            ref name,
125        } = event.payload
126        {
127            branches.push(BranchResponse {
128                branch_id: new_branch_id.as_str().to_string(),
129                name: name.clone(),
130                fork_point_seq,
131            });
132        }
133    }
134
135    Ok(Json(branches))
136}