1use std::collections::HashMap;
2use std::sync::Arc;
3
4use axum::Json;
5use axum::body::Bytes;
6use axum::extract::{Path, Query, State};
7use axum::http::StatusCode;
8use serde::{Deserialize, Serialize};
9
10use lago_core::event::{EventEnvelope, EventPayload};
11use lago_core::hashline::{HashLineEdit, HashLineFile};
12use lago_core::id::{BranchId, EventId, SessionId};
13use lago_core::{EventQuery, ManifestEntry};
14
15use crate::error::ApiError;
16use crate::state::AppState;
17
18#[derive(Serialize)]
21pub struct FileWriteResponse {
22 pub path: String,
23 pub blob_hash: String,
24 pub size_bytes: u64,
25}
26
27#[derive(Serialize)]
28pub struct ManifestResponse {
29 pub session_id: String,
30 pub entries: Vec<ManifestEntry>,
31}
32
33#[derive(Debug, Deserialize, Default)]
36pub struct FileReadQuery {
37 #[serde(default)]
39 pub format: Option<String>,
40 #[serde(default = "default_branch")]
42 pub branch: String,
43}
44
45#[derive(Debug, Deserialize, Default)]
46pub struct BranchQuery {
47 #[serde(default = "default_branch")]
49 pub branch: String,
50}
51
52fn default_branch() -> String {
53 "main".to_string()
54}
55
56pub async fn read_file(
66 State(state): State<Arc<AppState>>,
67 Path((session_id, file_path)): Path<(String, String)>,
68 Query(query): Query<FileReadQuery>,
69) -> Result<axum::http::Response<axum::body::Body>, ApiError> {
70 let session_id = SessionId::from_string(session_id.clone());
71 let branch_id = BranchId::from_string(query.branch.clone());
72 let file_path = normalize_path(&file_path);
73
74 state
76 .journal
77 .get_session(&session_id)
78 .await?
79 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
80
81 let manifest = build_manifest(&state, &session_id, &branch_id).await?;
83
84 let entry = manifest
85 .iter()
86 .find(|e| e.path == file_path)
87 .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
88
89 let data = state
90 .blob_store
91 .get(&entry.blob_hash)
92 .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
93
94 if query.format.as_deref() == Some("hashline") {
96 let text = String::from_utf8(data)
97 .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
98 let hashline_file = HashLineFile::from_content(&text);
99 let hashline_text = hashline_file.to_hashline_text();
100
101 return Ok(axum::http::Response::builder()
102 .status(StatusCode::OK)
103 .header("content-type", "text/plain")
104 .header("x-format", "hashline")
105 .header("x-blob-hash", entry.blob_hash.as_str())
106 .body(axum::body::Body::from(hashline_text))
107 .unwrap());
108 }
109
110 let content_type = entry
111 .content_type
112 .clone()
113 .unwrap_or_else(|| "application/octet-stream".to_string());
114
115 Ok(axum::http::Response::builder()
116 .status(StatusCode::OK)
117 .header("content-type", content_type)
118 .header("x-blob-hash", entry.blob_hash.as_str())
119 .body(axum::body::Body::from(data))
120 .unwrap())
121}
122
123pub async fn patch_file(
129 State(state): State<Arc<AppState>>,
130 Path((session_id, file_path)): Path<(String, String)>,
131 Query(branch): Query<BranchQuery>,
132 Json(edits): Json<Vec<HashLineEdit>>,
133) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
134 let session_id = SessionId::from_string(session_id.clone());
135 let branch_id = BranchId::from_string(branch.branch);
136 let file_path = normalize_path(&file_path);
137
138 state
140 .journal
141 .get_session(&session_id)
142 .await?
143 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
144
145 let manifest = build_manifest(&state, &session_id, &branch_id).await?;
147 let entry = manifest
148 .iter()
149 .find(|e| e.path == file_path)
150 .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
151
152 let data = state
153 .blob_store
154 .get(&entry.blob_hash)
155 .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
156
157 let text = String::from_utf8(data)
158 .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
159
160 let hashline_file = HashLineFile::from_content(&text);
162 let new_content = hashline_file
163 .apply_edits(&edits)
164 .map_err(lago_core::LagoError::from)?;
165
166 let blob_hash = state
168 .blob_store
169 .put(new_content.as_bytes())
170 .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
171
172 let size_bytes = new_content.len() as u64;
173
174 let event = EventEnvelope {
176 event_id: EventId::new(),
177 session_id: session_id.clone(),
178 branch_id,
179 run_id: None,
180 seq: 0,
181 timestamp: EventEnvelope::now_micros(),
182 parent_id: None,
183 payload: EventPayload::FileWrite {
184 path: file_path.clone(),
185 blob_hash: blob_hash.clone().into(),
186 size_bytes,
187 content_type: None,
188 },
189 metadata: HashMap::new(),
190 schema_version: 1,
191 };
192
193 state.journal.append(event).await?;
194
195 Ok((
196 StatusCode::OK,
197 Json(FileWriteResponse {
198 path: file_path,
199 blob_hash: blob_hash.to_string(),
200 size_bytes,
201 }),
202 ))
203}
204
205pub async fn write_file(
210 State(state): State<Arc<AppState>>,
211 Path((session_id, file_path)): Path<(String, String)>,
212 Query(branch): Query<BranchQuery>,
213 body: Bytes,
214) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
215 let session_id = SessionId::from_string(session_id.clone());
216 let branch_id = BranchId::from_string(branch.branch);
217 let file_path = normalize_path(&file_path);
218
219 state
221 .journal
222 .get_session(&session_id)
223 .await?
224 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
225
226 let blob_hash = state
228 .blob_store
229 .put(&body)
230 .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
231
232 let size_bytes = body.len() as u64;
233
234 let event = EventEnvelope {
236 event_id: EventId::new(),
237 session_id: session_id.clone(),
238 branch_id,
239 run_id: None,
240 seq: 0,
241 timestamp: EventEnvelope::now_micros(),
242 parent_id: None,
243 payload: EventPayload::FileWrite {
244 path: file_path.clone(),
245 blob_hash: blob_hash.clone().into(),
246 size_bytes,
247 content_type: None,
248 },
249 metadata: HashMap::new(),
250 schema_version: 1,
251 };
252
253 state.journal.append(event).await?;
254
255 Ok((
256 StatusCode::CREATED,
257 Json(FileWriteResponse {
258 path: file_path,
259 blob_hash: blob_hash.to_string(),
260 size_bytes,
261 }),
262 ))
263}
264
265pub async fn delete_file(
270 State(state): State<Arc<AppState>>,
271 Path((session_id, file_path)): Path<(String, String)>,
272 Query(branch): Query<BranchQuery>,
273) -> Result<StatusCode, ApiError> {
274 let session_id = SessionId::from_string(session_id.clone());
275 let branch_id = BranchId::from_string(branch.branch);
276 let file_path = normalize_path(&file_path);
277
278 state
280 .journal
281 .get_session(&session_id)
282 .await?
283 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
284
285 let event = EventEnvelope {
286 event_id: EventId::new(),
287 session_id: session_id.clone(),
288 branch_id,
289 run_id: None,
290 seq: 0,
291 timestamp: EventEnvelope::now_micros(),
292 parent_id: None,
293 payload: EventPayload::FileDelete {
294 path: file_path.clone(),
295 },
296 metadata: HashMap::new(),
297 schema_version: 1,
298 };
299
300 state.journal.append(event).await?;
301
302 Ok(StatusCode::NO_CONTENT)
303}
304
305pub async fn get_manifest(
310 State(state): State<Arc<AppState>>,
311 Path(session_id): Path<String>,
312 Query(branch): Query<BranchQuery>,
313) -> Result<Json<ManifestResponse>, ApiError> {
314 let session_id = SessionId::from_string(session_id.clone());
315 let branch_id = BranchId::from_string(branch.branch);
316
317 state
319 .journal
320 .get_session(&session_id)
321 .await?
322 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
323
324 let entries = build_manifest(&state, &session_id, &branch_id).await?;
325
326 Ok(Json(ManifestResponse {
327 session_id: session_id.to_string(),
328 entries,
329 }))
330}
331
332async fn build_manifest(
336 state: &Arc<AppState>,
337 session_id: &SessionId,
338 branch_id: &BranchId,
339) -> Result<Vec<ManifestEntry>, ApiError> {
340 let query = EventQuery::new()
341 .session(session_id.clone())
342 .branch(branch_id.clone());
343 let events = state.journal.read(query).await?;
344
345 let mut manifest = lago_fs::Manifest::new();
346
347 for event in &events {
348 match &event.payload {
349 EventPayload::FileWrite {
350 path,
351 blob_hash,
352 size_bytes,
353 content_type,
354 } => {
355 manifest.apply_write(
357 path.clone(),
358 lago_core::BlobHash::from_hex(blob_hash.as_str()),
359 *size_bytes,
360 content_type.clone(),
361 event.timestamp,
362 );
363 }
364 EventPayload::FileDelete { path } => {
365 manifest.apply_delete(path);
366 }
367 EventPayload::FileRename { old_path, new_path } => {
368 manifest.apply_rename(old_path, new_path.clone());
369 }
370 _ => {}
371 }
372 }
373
374 let entries: Vec<ManifestEntry> = manifest.entries().values().cloned().collect();
375
376 Ok(entries)
377}
378
379fn normalize_path(path: &str) -> String {
381 if path.starts_with('/') {
382 path.to_string()
383 } else {
384 format!("/{path}")
385 }
386}