1use std::collections::HashMap;
2use std::sync::Arc;
3
4use axum::Json;
5use axum::body::Bytes;
6use axum::extract::{Path, Query, State};
7use axum::http::StatusCode;
8use serde::{Deserialize, Serialize};
9
10use lago_core::event::{EventEnvelope, EventPayload};
11use lago_core::hashline::{HashLineEdit, HashLineFile};
12use lago_core::id::{BranchId, EventId, SessionId};
13use lago_core::{EventQuery, ManifestEntry};
14
15use crate::error::ApiError;
16use crate::state::AppState;
17
18#[derive(Serialize)]
21pub struct FileWriteResponse {
22 pub path: String,
23 pub blob_hash: String,
24 pub size_bytes: u64,
25}
26
27#[derive(Serialize)]
28pub struct ManifestResponse {
29 pub session_id: String,
30 pub entries: Vec<ManifestEntry>,
31}
32
33#[derive(Debug, Deserialize, Default)]
36pub struct FileReadQuery {
37 #[serde(default)]
39 pub format: Option<String>,
40}
41
42pub async fn read_file(
52 State(state): State<Arc<AppState>>,
53 Path((session_id, file_path)): Path<(String, String)>,
54 Query(query): Query<FileReadQuery>,
55) -> Result<axum::http::Response<axum::body::Body>, ApiError> {
56 let session_id = SessionId::from_string(session_id.clone());
57 let file_path = normalize_path(&file_path);
58
59 state
61 .journal
62 .get_session(&session_id)
63 .await?
64 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
65
66 let manifest = build_manifest(&state, &session_id).await?;
68
69 let entry = manifest
70 .iter()
71 .find(|e| e.path == file_path)
72 .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
73
74 let data = state
75 .blob_store
76 .get(&entry.blob_hash)
77 .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
78
79 if query.format.as_deref() == Some("hashline") {
81 let text = String::from_utf8(data)
82 .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
83 let hashline_file = HashLineFile::from_content(&text);
84 let hashline_text = hashline_file.to_hashline_text();
85
86 return Ok(axum::http::Response::builder()
87 .status(StatusCode::OK)
88 .header("content-type", "text/plain")
89 .header("x-format", "hashline")
90 .header("x-blob-hash", entry.blob_hash.as_str())
91 .body(axum::body::Body::from(hashline_text))
92 .unwrap());
93 }
94
95 let content_type = entry
96 .content_type
97 .clone()
98 .unwrap_or_else(|| "application/octet-stream".to_string());
99
100 Ok(axum::http::Response::builder()
101 .status(StatusCode::OK)
102 .header("content-type", content_type)
103 .header("x-blob-hash", entry.blob_hash.as_str())
104 .body(axum::body::Body::from(data))
105 .unwrap())
106}
107
108pub async fn patch_file(
114 State(state): State<Arc<AppState>>,
115 Path((session_id, file_path)): Path<(String, String)>,
116 Json(edits): Json<Vec<HashLineEdit>>,
117) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
118 let session_id = SessionId::from_string(session_id.clone());
119 let file_path = normalize_path(&file_path);
120
121 state
123 .journal
124 .get_session(&session_id)
125 .await?
126 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
127
128 let manifest = build_manifest(&state, &session_id).await?;
130 let entry = manifest
131 .iter()
132 .find(|e| e.path == file_path)
133 .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
134
135 let data = state
136 .blob_store
137 .get(&entry.blob_hash)
138 .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
139
140 let text = String::from_utf8(data)
141 .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
142
143 let hashline_file = HashLineFile::from_content(&text);
145 let new_content = hashline_file
146 .apply_edits(&edits)
147 .map_err(lago_core::LagoError::from)?;
148
149 let blob_hash = state
151 .blob_store
152 .put(new_content.as_bytes())
153 .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
154
155 let size_bytes = new_content.len() as u64;
156 let branch_id = BranchId::from_string("main");
157 let seq = next_seq(&state, &session_id, &branch_id).await?;
158
159 let event = EventEnvelope {
161 event_id: EventId::new(),
162 session_id: session_id.clone(),
163 branch_id,
164 run_id: None,
165 seq,
166 timestamp: EventEnvelope::now_micros(),
167 parent_id: None,
168 payload: EventPayload::FileWrite {
169 path: file_path.clone(),
170 blob_hash: blob_hash.clone(),
171 size_bytes,
172 content_type: None,
173 },
174 metadata: HashMap::new(),
175 };
176
177 state.journal.append(event).await?;
178
179 Ok((
180 StatusCode::OK,
181 Json(FileWriteResponse {
182 path: file_path,
183 blob_hash: blob_hash.to_string(),
184 size_bytes,
185 }),
186 ))
187}
188
189pub async fn write_file(
194 State(state): State<Arc<AppState>>,
195 Path((session_id, file_path)): Path<(String, String)>,
196 body: Bytes,
197) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
198 let session_id = SessionId::from_string(session_id.clone());
199 let file_path = normalize_path(&file_path);
200
201 state
203 .journal
204 .get_session(&session_id)
205 .await?
206 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
207
208 let blob_hash = state
210 .blob_store
211 .put(&body)
212 .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
213
214 let size_bytes = body.len() as u64;
215 let branch_id = BranchId::from_string("main");
216 let seq = next_seq(&state, &session_id, &branch_id).await?;
217
218 let event = EventEnvelope {
220 event_id: EventId::new(),
221 session_id: session_id.clone(),
222 branch_id,
223 run_id: None,
224 seq,
225 timestamp: EventEnvelope::now_micros(),
226 parent_id: None,
227 payload: EventPayload::FileWrite {
228 path: file_path.clone(),
229 blob_hash: blob_hash.clone(),
230 size_bytes,
231 content_type: None,
232 },
233 metadata: HashMap::new(),
234 };
235
236 state.journal.append(event).await?;
237
238 Ok((
239 StatusCode::CREATED,
240 Json(FileWriteResponse {
241 path: file_path,
242 blob_hash: blob_hash.to_string(),
243 size_bytes,
244 }),
245 ))
246}
247
248pub async fn delete_file(
253 State(state): State<Arc<AppState>>,
254 Path((session_id, file_path)): Path<(String, String)>,
255) -> Result<StatusCode, ApiError> {
256 let session_id = SessionId::from_string(session_id.clone());
257 let file_path = normalize_path(&file_path);
258
259 state
261 .journal
262 .get_session(&session_id)
263 .await?
264 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
265
266 let branch_id = BranchId::from_string("main");
267 let seq = next_seq(&state, &session_id, &branch_id).await?;
268
269 let event = EventEnvelope {
270 event_id: EventId::new(),
271 session_id: session_id.clone(),
272 branch_id,
273 run_id: None,
274 seq,
275 timestamp: EventEnvelope::now_micros(),
276 parent_id: None,
277 payload: EventPayload::FileDelete {
278 path: file_path.clone(),
279 },
280 metadata: HashMap::new(),
281 };
282
283 state.journal.append(event).await?;
284
285 Ok(StatusCode::NO_CONTENT)
286}
287
288pub async fn get_manifest(
293 State(state): State<Arc<AppState>>,
294 Path(session_id): Path<String>,
295) -> Result<Json<ManifestResponse>, ApiError> {
296 let session_id = SessionId::from_string(session_id.clone());
297
298 state
300 .journal
301 .get_session(&session_id)
302 .await?
303 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
304
305 let entries = build_manifest(&state, &session_id).await?;
306
307 Ok(Json(ManifestResponse {
308 session_id: session_id.to_string(),
309 entries,
310 }))
311}
312
313async fn next_seq(
317 state: &Arc<AppState>,
318 session_id: &SessionId,
319 branch_id: &BranchId,
320) -> Result<u64, ApiError> {
321 let head = state.journal.head_seq(session_id, branch_id).await?;
322 Ok(head + 1)
323}
324
325async fn build_manifest(
327 state: &Arc<AppState>,
328 session_id: &SessionId,
329) -> Result<Vec<ManifestEntry>, ApiError> {
330 let query = EventQuery::new().session(session_id.clone());
331 let events = state.journal.read(query).await?;
332
333 let mut manifest = lago_fs::Manifest::new();
334
335 for event in &events {
336 match &event.payload {
337 EventPayload::FileWrite {
338 path,
339 blob_hash,
340 size_bytes,
341 content_type,
342 } => {
343 manifest.apply_write(
344 path.clone(),
345 blob_hash.clone(),
346 *size_bytes,
347 content_type.clone(),
348 event.timestamp,
349 );
350 }
351 EventPayload::FileDelete { path } => {
352 manifest.apply_delete(path);
353 }
354 EventPayload::FileRename { old_path, new_path } => {
355 manifest.apply_rename(old_path, new_path.clone());
356 }
357 _ => {}
358 }
359 }
360
361 let entries: Vec<ManifestEntry> = manifest.entries().values().cloned().collect();
362
363 Ok(entries)
364}
365
366fn normalize_path(path: &str) -> String {
368 if path.starts_with('/') {
369 path.to_string()
370 } else {
371 format!("/{path}")
372 }
373}