1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use axum::Json;
6use axum::body::Bytes;
7use axum::extract::{Path, Query, State};
8use axum::http::StatusCode;
9use serde::{Deserialize, Serialize};
10use tracing::debug;
11
12use lago_core::event::{EventEnvelope, EventPayload};
13use lago_core::hashline::{HashLineEdit, HashLineFile};
14use lago_core::id::{BranchId, EventId, SessionId};
15use lago_core::{EventQuery, ManifestEntry};
16
17use crate::error::ApiError;
18use crate::state::{AppState, CachedManifest, MANIFEST_CACHE_TTL_SECS};
19
20#[derive(Serialize)]
23pub struct FileWriteResponse {
24 pub path: String,
25 pub blob_hash: String,
26 pub size_bytes: u64,
27}
28
29#[derive(Serialize)]
30pub struct ManifestResponse {
31 pub session_id: String,
32 pub entries: Vec<ManifestEntry>,
33}
34
35#[derive(Debug, Deserialize, Default)]
38pub struct FileReadQuery {
39 #[serde(default)]
41 pub format: Option<String>,
42 #[serde(default = "default_branch")]
44 pub branch: String,
45}
46
47#[derive(Debug, Deserialize, Default)]
48pub struct BranchQuery {
49 #[serde(default = "default_branch")]
51 pub branch: String,
52}
53
54fn default_branch() -> String {
55 "main".to_string()
56}
57
58pub async fn read_file(
68 State(state): State<Arc<AppState>>,
69 Path((session_id, file_path)): Path<(String, String)>,
70 Query(query): Query<FileReadQuery>,
71) -> Result<axum::http::Response<axum::body::Body>, ApiError> {
72 let session_id = SessionId::from_string(session_id.clone());
73 let branch_id = BranchId::from_string(query.branch.clone());
74 let file_path = normalize_path(&file_path);
75
76 state
78 .journal
79 .get_session(&session_id)
80 .await?
81 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
82
83 let manifest = build_manifest(&state, &session_id, &branch_id).await?;
85
86 let entry = manifest
87 .iter()
88 .find(|e| e.path == file_path)
89 .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
90
91 let data = state
92 .blob_store
93 .get(&entry.blob_hash)
94 .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
95
96 if query.format.as_deref() == Some("hashline") {
98 let text = String::from_utf8(data)
99 .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
100 let hashline_file = HashLineFile::from_content(&text);
101 let hashline_text = hashline_file.to_hashline_text();
102
103 return Ok(axum::http::Response::builder()
104 .status(StatusCode::OK)
105 .header("content-type", "text/plain")
106 .header("x-format", "hashline")
107 .header("x-blob-hash", entry.blob_hash.as_str())
108 .body(axum::body::Body::from(hashline_text))
109 .unwrap());
110 }
111
112 let content_type = entry
113 .content_type
114 .clone()
115 .unwrap_or_else(|| "application/octet-stream".to_string());
116
117 Ok(axum::http::Response::builder()
118 .status(StatusCode::OK)
119 .header("content-type", content_type)
120 .header("x-blob-hash", entry.blob_hash.as_str())
121 .body(axum::body::Body::from(data))
122 .unwrap())
123}
124
125pub async fn patch_file(
131 State(state): State<Arc<AppState>>,
132 Path((session_id, file_path)): Path<(String, String)>,
133 Query(branch): Query<BranchQuery>,
134 Json(edits): Json<Vec<HashLineEdit>>,
135) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
136 let session_id = SessionId::from_string(session_id.clone());
137 let branch_id = BranchId::from_string(branch.branch);
138 let file_path = normalize_path(&file_path);
139
140 state
142 .journal
143 .get_session(&session_id)
144 .await?
145 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
146
147 let manifest = build_manifest(&state, &session_id, &branch_id).await?;
149 let entry = manifest
150 .iter()
151 .find(|e| e.path == file_path)
152 .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
153
154 let data = state
155 .blob_store
156 .get(&entry.blob_hash)
157 .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
158
159 let text = String::from_utf8(data)
160 .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
161
162 let hashline_file = HashLineFile::from_content(&text);
164 let new_content = hashline_file
165 .apply_edits(&edits)
166 .map_err(lago_core::LagoError::from)?;
167
168 let blob_hash = state
170 .blob_store
171 .put(new_content.as_bytes())
172 .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
173
174 let size_bytes = new_content.len() as u64;
175
176 let event = EventEnvelope {
178 event_id: EventId::new(),
179 session_id: session_id.clone(),
180 branch_id: branch_id.clone(),
181 run_id: None,
182 seq: 0,
183 timestamp: EventEnvelope::now_micros(),
184 parent_id: None,
185 payload: EventPayload::FileWrite {
186 path: file_path.clone(),
187 blob_hash: blob_hash.clone().into(),
188 size_bytes,
189 content_type: None,
190 },
191 metadata: HashMap::new(),
192 schema_version: 1,
193 };
194
195 state.journal.append(event).await?;
196 invalidate_manifest_cache(&state, &session_id, &branch_id).await;
197
198 Ok((
199 StatusCode::OK,
200 Json(FileWriteResponse {
201 path: file_path,
202 blob_hash: blob_hash.to_string(),
203 size_bytes,
204 }),
205 ))
206}
207
208pub async fn write_file(
213 State(state): State<Arc<AppState>>,
214 Path((session_id, file_path)): Path<(String, String)>,
215 Query(branch): Query<BranchQuery>,
216 body: Bytes,
217) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
218 let session_id = SessionId::from_string(session_id.clone());
219 let branch_id = BranchId::from_string(branch.branch);
220 let file_path = normalize_path(&file_path);
221
222 state
224 .journal
225 .get_session(&session_id)
226 .await?
227 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
228
229 let blob_hash = state
231 .blob_store
232 .put(&body)
233 .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
234
235 let size_bytes = body.len() as u64;
236
237 let event = EventEnvelope {
239 event_id: EventId::new(),
240 session_id: session_id.clone(),
241 branch_id: branch_id.clone(),
242 run_id: None,
243 seq: 0,
244 timestamp: EventEnvelope::now_micros(),
245 parent_id: None,
246 payload: EventPayload::FileWrite {
247 path: file_path.clone(),
248 blob_hash: blob_hash.clone().into(),
249 size_bytes,
250 content_type: None,
251 },
252 metadata: HashMap::new(),
253 schema_version: 1,
254 };
255
256 state.journal.append(event).await?;
257 invalidate_manifest_cache(&state, &session_id, &branch_id).await;
258
259 Ok((
260 StatusCode::CREATED,
261 Json(FileWriteResponse {
262 path: file_path,
263 blob_hash: blob_hash.to_string(),
264 size_bytes,
265 }),
266 ))
267}
268
269pub async fn delete_file(
274 State(state): State<Arc<AppState>>,
275 Path((session_id, file_path)): Path<(String, String)>,
276 Query(branch): Query<BranchQuery>,
277) -> Result<StatusCode, ApiError> {
278 let session_id = SessionId::from_string(session_id.clone());
279 let branch_id = BranchId::from_string(branch.branch);
280 let file_path = normalize_path(&file_path);
281
282 state
284 .journal
285 .get_session(&session_id)
286 .await?
287 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
288
289 let event = EventEnvelope {
290 event_id: EventId::new(),
291 session_id: session_id.clone(),
292 branch_id: branch_id.clone(),
293 run_id: None,
294 seq: 0,
295 timestamp: EventEnvelope::now_micros(),
296 parent_id: None,
297 payload: EventPayload::FileDelete {
298 path: file_path.clone(),
299 },
300 metadata: HashMap::new(),
301 schema_version: 1,
302 };
303
304 state.journal.append(event).await?;
305 invalidate_manifest_cache(&state, &session_id, &branch_id).await;
306
307 Ok(StatusCode::NO_CONTENT)
308}
309
310pub async fn get_manifest(
315 State(state): State<Arc<AppState>>,
316 Path(session_id): Path<String>,
317 Query(branch): Query<BranchQuery>,
318) -> Result<Json<ManifestResponse>, ApiError> {
319 let session_id = SessionId::from_string(session_id.clone());
320 let branch_id = BranchId::from_string(branch.branch);
321
322 state
324 .journal
325 .get_session(&session_id)
326 .await?
327 .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
328
329 let entries = build_manifest(&state, &session_id, &branch_id).await?;
330
331 Ok(Json(ManifestResponse {
332 session_id: session_id.to_string(),
333 entries,
334 }))
335}
336
337async fn build_manifest(
342 state: &Arc<AppState>,
343 session_id: &SessionId,
344 branch_id: &BranchId,
345) -> Result<Vec<ManifestEntry>, ApiError> {
346 let cache_key = (session_id.to_string(), branch_id.to_string());
347
348 {
350 let cache = state.manifest_cache.read().await;
351 if let Some(cached) = cache.get(&cache_key) {
352 if cached.cached_at.elapsed().as_secs() < MANIFEST_CACHE_TTL_SECS {
353 debug!(
354 session_id = %session_id,
355 branch_id = %branch_id,
356 "manifest cache hit"
357 );
358 return Ok(cached.entries.clone());
359 }
360 }
361 }
362
363 let query = EventQuery::new()
365 .session(session_id.clone())
366 .branch(branch_id.clone());
367 let events = state.journal.read(query).await?;
368
369 let mut manifest = lago_fs::Manifest::new();
370
371 for event in &events {
372 match &event.payload {
373 EventPayload::FileWrite {
374 path,
375 blob_hash,
376 size_bytes,
377 content_type,
378 } => {
379 manifest.apply_write(
381 path.clone(),
382 lago_core::BlobHash::from_hex(blob_hash.as_str()),
383 *size_bytes,
384 content_type.clone(),
385 event.timestamp,
386 );
387 }
388 EventPayload::FileDelete { path } => {
389 manifest.apply_delete(path);
390 }
391 EventPayload::FileRename { old_path, new_path } => {
392 manifest.apply_rename(old_path, new_path.clone());
393 }
394 _ => {}
395 }
396 }
397
398 let entries: Vec<ManifestEntry> = manifest.entries().values().cloned().collect();
399
400 {
402 let mut cache = state.manifest_cache.write().await;
403 cache.insert(
404 cache_key,
405 CachedManifest {
406 entries: entries.clone(),
407 cached_at: Instant::now(),
408 },
409 );
410 }
411
412 debug!(
413 session_id = %session_id,
414 branch_id = %branch_id,
415 entry_count = entries.len(),
416 "manifest rebuilt and cached"
417 );
418
419 Ok(entries)
420}
421
422async fn invalidate_manifest_cache(
424 state: &Arc<AppState>,
425 session_id: &SessionId,
426 branch_id: &BranchId,
427) {
428 let cache_key = (session_id.to_string(), branch_id.to_string());
429 let mut cache = state.manifest_cache.write().await;
430 cache.remove(&cache_key);
431}
432
433fn normalize_path(path: &str) -> String {
435 if path.starts_with('/') {
436 path.to_string()
437 } else {
438 format!("/{path}")
439 }
440}