1use std::sync::Arc;
2
3use axum::Json;
4use axum::extract::{Path, Query, State};
5use serde::Deserialize;
6
7use lago_core::EventQuery;
8use lago_core::event::EventPayload;
9use lago_core::id::{BranchId, SessionId};
10use lago_fs::diff::{self, DiffEntry};
11
12use crate::error::ApiError;
13use crate::state::AppState;
14
15#[derive(Deserialize)]
19pub struct DiffQuery {
20 pub from: String,
22 #[serde(default)]
25 pub to: Option<String>,
26 #[serde(default = "default_branch")]
28 pub branch: String,
29}
30
31fn default_branch() -> String {
32 "main".to_string()
33}
34
35pub async fn get_diff(
45 State(state): State<Arc<AppState>>,
46 Path(session_id): Path<String>,
47 Query(query): Query<DiffQuery>,
48) -> Result<Json<Vec<DiffEntry>>, ApiError> {
49 let session_id = SessionId::from_string(session_id.clone());
50
51 state
53 .journal
54 .get_session(&session_id)
55 .await?
56 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
57
58 let from_manifest =
59 resolve_ref_to_manifest(&state, &session_id, &query.from, &query.branch).await?;
60 let to_ref = query.to.unwrap_or_else(|| query.branch.clone());
61 let to_manifest = resolve_ref_to_manifest(&state, &session_id, &to_ref, &query.branch).await?;
62
63 let diff_entries = diff::diff(&from_manifest, &to_manifest);
64 Ok(Json(diff_entries))
65}
66
67async fn resolve_ref_to_manifest(
71 state: &Arc<AppState>,
72 session_id: &SessionId,
73 reference: &str,
74 default_branch: &str,
75) -> Result<lago_fs::Manifest, ApiError> {
76 if let Ok(seq) = reference.parse::<u64>() {
78 return build_manifest_at_seq(state, session_id, default_branch, seq).await;
79 }
80
81 if let Some(snap_name) = reference.strip_prefix("snap:") {
83 return build_manifest_at_snapshot(state, session_id, snap_name).await;
84 }
85
86 let branch_id = BranchId::from_string(reference);
88 build_manifest_at_head(state, session_id, &branch_id).await
89}
90
91async fn build_manifest_at_head(
93 state: &Arc<AppState>,
94 session_id: &SessionId,
95 branch_id: &BranchId,
96) -> Result<lago_fs::Manifest, ApiError> {
97 let query = EventQuery::new()
98 .session(session_id.clone())
99 .branch(branch_id.clone());
100 let events = state.journal.read(query).await?;
101 Ok(build_manifest_from_events(&events))
102}
103
104async fn build_manifest_at_seq(
106 state: &Arc<AppState>,
107 session_id: &SessionId,
108 branch: &str,
109 max_seq: u64,
110) -> Result<lago_fs::Manifest, ApiError> {
111 let branch_id = BranchId::from_string(branch);
112 let query = EventQuery::new()
113 .session(session_id.clone())
114 .branch(branch_id);
115 let events = state.journal.read(query).await?;
116
117 let mut manifest = lago_fs::Manifest::new();
118 for event in &events {
119 if event.seq > max_seq {
120 break;
121 }
122 apply_event_to_manifest(&mut manifest, event);
123 }
124 Ok(manifest)
125}
126
127async fn build_manifest_at_snapshot(
132 state: &Arc<AppState>,
133 session_id: &SessionId,
134 snapshot_name: &str,
135) -> Result<lago_fs::Manifest, ApiError> {
136 let query = EventQuery::new()
138 .session(session_id.clone())
139 .with_kind("SnapshotCreated");
140 let events = state.journal.read(query).await?;
141
142 let mut covers_seq = None;
143 let mut branch = "main".to_string();
144
145 for event in &events {
146 if let EventPayload::SnapshotCreated {
147 snapshot_id,
148 covers_through_seq,
149 ..
150 } = &event.payload
151 {
152 if snapshot_id.as_str() == snapshot_name {
153 covers_seq = Some(*covers_through_seq);
154 branch = event.branch_id.as_str().to_string();
155 break;
156 }
157 }
158 }
159
160 let seq = covers_seq
161 .ok_or_else(|| ApiError::NotFound(format!("snapshot not found: {snapshot_name}")))?;
162
163 build_manifest_at_seq(state, session_id, &branch, seq).await
164}
165
166fn build_manifest_from_events(events: &[lago_core::event::EventEnvelope]) -> lago_fs::Manifest {
168 let mut manifest = lago_fs::Manifest::new();
169 for event in events {
170 apply_event_to_manifest(&mut manifest, event);
171 }
172 manifest
173}
174
175fn apply_event_to_manifest(
176 manifest: &mut lago_fs::Manifest,
177 event: &lago_core::event::EventEnvelope,
178) {
179 match &event.payload {
180 EventPayload::FileWrite {
181 path,
182 blob_hash,
183 size_bytes,
184 content_type,
185 } => {
186 manifest.apply_write(
187 path.clone(),
188 lago_core::BlobHash::from_hex(blob_hash.as_str()),
189 *size_bytes,
190 content_type.clone(),
191 event.timestamp,
192 );
193 }
194 EventPayload::FileDelete { path } => {
195 manifest.apply_delete(path);
196 }
197 EventPayload::FileRename { old_path, new_path } => {
198 manifest.apply_rename(old_path, new_path.clone());
199 }
200 _ => {}
201 }
202}