Skip to main content

lago_api/routes/
files.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use axum::Json;
6use axum::body::Bytes;
7use axum::extract::{Path, Query, State};
8use axum::http::StatusCode;
9use serde::{Deserialize, Serialize};
10use tracing::debug;
11
12use lago_core::event::{EventEnvelope, EventPayload};
13use lago_core::hashline::{HashLineEdit, HashLineFile};
14use lago_core::id::{BranchId, EventId, SessionId};
15use lago_core::{EventQuery, ManifestEntry};
16
17use crate::error::ApiError;
18use crate::state::{AppState, CachedManifest, MANIFEST_CACHE_TTL_SECS};
19
20// --- Response types
21
22#[derive(Serialize)]
23pub struct FileWriteResponse {
24    pub path: String,
25    pub blob_hash: String,
26    pub size_bytes: u64,
27}
28
29#[derive(Serialize)]
30pub struct ManifestResponse {
31    pub session_id: String,
32    pub entries: Vec<ManifestEntry>,
33}
34
35// --- Query types
36
37#[derive(Debug, Deserialize, Default)]
38pub struct FileReadQuery {
39    /// Optional format: "hashline" returns content in hashline format.
40    #[serde(default)]
41    pub format: Option<String>,
42    /// Branch to read from (default: main).
43    #[serde(default = "default_branch")]
44    pub branch: String,
45}
46
47#[derive(Debug, Deserialize, Default)]
48pub struct BranchQuery {
49    /// Branch to operate on (default: main).
50    #[serde(default = "default_branch")]
51    pub branch: String,
52}
53
54fn default_branch() -> String {
55    "main".to_string()
56}
57
58// --- Handlers
59
60/// GET /v1/sessions/:id/files/*path
61///
62/// Reads a file from the session's virtual filesystem by replaying the
63/// manifest from journal events and fetching the blob from the store.
64///
65/// Supports `?format=hashline` to return content in hashline format
66/// (`N:HHHH|content` per line) with `x-format: hashline` header.
67pub async fn read_file(
68    State(state): State<Arc<AppState>>,
69    Path((session_id, file_path)): Path<(String, String)>,
70    Query(query): Query<FileReadQuery>,
71) -> Result<axum::http::Response<axum::body::Body>, ApiError> {
72    let session_id = SessionId::from_string(session_id.clone());
73    let branch_id = BranchId::from_string(query.branch.clone());
74    let file_path = normalize_path(&file_path);
75
76    // Verify session exists
77    state
78        .journal
79        .get_session(&session_id)
80        .await?
81        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
82
83    // Build manifest from events
84    let manifest = build_manifest(&state, &session_id, &branch_id).await?;
85
86    let entry = manifest
87        .iter()
88        .find(|e| e.path == file_path)
89        .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
90
91    let data = state
92        .blob_store
93        .get(&entry.blob_hash)
94        .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
95
96    // If hashline format requested, convert
97    if query.format.as_deref() == Some("hashline") {
98        let text = String::from_utf8(data)
99            .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
100        let hashline_file = HashLineFile::from_content(&text);
101        let hashline_text = hashline_file.to_hashline_text();
102
103        return Ok(axum::http::Response::builder()
104            .status(StatusCode::OK)
105            .header("content-type", "text/plain")
106            .header("x-format", "hashline")
107            .header("x-blob-hash", entry.blob_hash.as_str())
108            .body(axum::body::Body::from(hashline_text))
109            .unwrap());
110    }
111
112    let content_type = entry
113        .content_type
114        .clone()
115        .unwrap_or_else(|| "application/octet-stream".to_string());
116
117    Ok(axum::http::Response::builder()
118        .status(StatusCode::OK)
119        .header("content-type", content_type)
120        .header("x-blob-hash", entry.blob_hash.as_str())
121        .body(axum::body::Body::from(data))
122        .unwrap())
123}
124
125/// PATCH /v1/sessions/:id/files/*path
126///
127/// Applies hashline edits to a file. Accepts a JSON array of `HashLineEdit`
128/// operations, applies them to the current file content, stores the result
129/// as a new blob, and emits a `FileWrite` event.
130pub async fn patch_file(
131    State(state): State<Arc<AppState>>,
132    Path((session_id, file_path)): Path<(String, String)>,
133    Query(branch): Query<BranchQuery>,
134    Json(edits): Json<Vec<HashLineEdit>>,
135) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
136    let session_id = SessionId::from_string(session_id.clone());
137    let branch_id = BranchId::from_string(branch.branch);
138    let file_path = normalize_path(&file_path);
139
140    // Verify session exists
141    state
142        .journal
143        .get_session(&session_id)
144        .await?
145        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
146
147    // Build manifest and read current file
148    let manifest = build_manifest(&state, &session_id, &branch_id).await?;
149    let entry = manifest
150        .iter()
151        .find(|e| e.path == file_path)
152        .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
153
154    let data = state
155        .blob_store
156        .get(&entry.blob_hash)
157        .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
158
159    let text = String::from_utf8(data)
160        .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
161
162    // Apply hashline edits
163    let hashline_file = HashLineFile::from_content(&text);
164    let new_content = hashline_file
165        .apply_edits(&edits)
166        .map_err(lago_core::LagoError::from)?;
167
168    // Store new blob
169    let blob_hash = state
170        .blob_store
171        .put(new_content.as_bytes())
172        .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
173
174    let size_bytes = new_content.len() as u64;
175
176    // Emit a FileWrite event
177    let event = EventEnvelope {
178        event_id: EventId::new(),
179        session_id: session_id.clone(),
180        branch_id: branch_id.clone(),
181        run_id: None,
182        seq: 0,
183        timestamp: EventEnvelope::now_micros(),
184        parent_id: None,
185        payload: EventPayload::FileWrite {
186            path: file_path.clone(),
187            blob_hash: blob_hash.clone().into(),
188            size_bytes,
189            content_type: None,
190        },
191        metadata: HashMap::new(),
192        schema_version: 1,
193    };
194
195    state.journal.append(event).await?;
196    invalidate_manifest_cache(&state, &session_id, &branch_id).await;
197
198    Ok((
199        StatusCode::OK,
200        Json(FileWriteResponse {
201            path: file_path,
202            blob_hash: blob_hash.to_string(),
203            size_bytes,
204        }),
205    ))
206}
207
208/// PUT /v1/sessions/:id/files/*path
209///
210/// Writes a file to the session's virtual filesystem. The file contents are
211/// stored as a blob and a `FileWrite` event is appended to the journal.
212pub async fn write_file(
213    State(state): State<Arc<AppState>>,
214    Path((session_id, file_path)): Path<(String, String)>,
215    Query(branch): Query<BranchQuery>,
216    body: Bytes,
217) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
218    let session_id = SessionId::from_string(session_id.clone());
219    let branch_id = BranchId::from_string(branch.branch);
220    let file_path = normalize_path(&file_path);
221
222    // Verify session exists
223    state
224        .journal
225        .get_session(&session_id)
226        .await?
227        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
228
229    // Store the blob
230    let blob_hash = state
231        .blob_store
232        .put(&body)
233        .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
234
235    let size_bytes = body.len() as u64;
236
237    // Emit a FileWrite event
238    let event = EventEnvelope {
239        event_id: EventId::new(),
240        session_id: session_id.clone(),
241        branch_id: branch_id.clone(),
242        run_id: None,
243        seq: 0,
244        timestamp: EventEnvelope::now_micros(),
245        parent_id: None,
246        payload: EventPayload::FileWrite {
247            path: file_path.clone(),
248            blob_hash: blob_hash.clone().into(),
249            size_bytes,
250            content_type: None,
251        },
252        metadata: HashMap::new(),
253        schema_version: 1,
254    };
255
256    state.journal.append(event).await?;
257    invalidate_manifest_cache(&state, &session_id, &branch_id).await;
258
259    Ok((
260        StatusCode::CREATED,
261        Json(FileWriteResponse {
262            path: file_path,
263            blob_hash: blob_hash.to_string(),
264            size_bytes,
265        }),
266    ))
267}
268
269/// DELETE /v1/sessions/:id/files/*path
270///
271/// Removes a file from the session's virtual filesystem by appending a
272/// `FileDelete` event.
273pub async fn delete_file(
274    State(state): State<Arc<AppState>>,
275    Path((session_id, file_path)): Path<(String, String)>,
276    Query(branch): Query<BranchQuery>,
277) -> Result<StatusCode, ApiError> {
278    let session_id = SessionId::from_string(session_id.clone());
279    let branch_id = BranchId::from_string(branch.branch);
280    let file_path = normalize_path(&file_path);
281
282    // Verify session exists
283    state
284        .journal
285        .get_session(&session_id)
286        .await?
287        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
288
289    let event = EventEnvelope {
290        event_id: EventId::new(),
291        session_id: session_id.clone(),
292        branch_id: branch_id.clone(),
293        run_id: None,
294        seq: 0,
295        timestamp: EventEnvelope::now_micros(),
296        parent_id: None,
297        payload: EventPayload::FileDelete {
298            path: file_path.clone(),
299        },
300        metadata: HashMap::new(),
301        schema_version: 1,
302    };
303
304    state.journal.append(event).await?;
305    invalidate_manifest_cache(&state, &session_id, &branch_id).await;
306
307    Ok(StatusCode::NO_CONTENT)
308}
309
310/// GET /v1/sessions/:id/manifest
311///
312/// Returns the full manifest (list of all files) for a session by replaying
313/// file events from the journal.
314pub async fn get_manifest(
315    State(state): State<Arc<AppState>>,
316    Path(session_id): Path<String>,
317    Query(branch): Query<BranchQuery>,
318) -> Result<Json<ManifestResponse>, ApiError> {
319    let session_id = SessionId::from_string(session_id.clone());
320    let branch_id = BranchId::from_string(branch.branch);
321
322    // Verify session exists
323    state
324        .journal
325        .get_session(&session_id)
326        .await?
327        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
328
329    let entries = build_manifest(&state, &session_id, &branch_id).await?;
330
331    Ok(Json(ManifestResponse {
332        session_id: session_id.to_string(),
333        entries,
334    }))
335}
336
337// --- Internal helpers
338
339/// Build a manifest by replaying file events from the journal.
340/// Uses an in-memory cache with TTL to avoid replaying events on every request.
341async fn build_manifest(
342    state: &Arc<AppState>,
343    session_id: &SessionId,
344    branch_id: &BranchId,
345) -> Result<Vec<ManifestEntry>, ApiError> {
346    let cache_key = (session_id.to_string(), branch_id.to_string());
347
348    // Check cache first (read lock — fast path)
349    {
350        let cache = state.manifest_cache.read().await;
351        if let Some(cached) = cache.get(&cache_key) {
352            if cached.cached_at.elapsed().as_secs() < MANIFEST_CACHE_TTL_SECS {
353                debug!(
354                    session_id = %session_id,
355                    branch_id = %branch_id,
356                    "manifest cache hit"
357                );
358                return Ok(cached.entries.clone());
359            }
360        }
361    }
362
363    // Cache miss or expired — replay events
364    let query = EventQuery::new()
365        .session(session_id.clone())
366        .branch(branch_id.clone());
367    let events = state.journal.read(query).await?;
368
369    let mut manifest = lago_fs::Manifest::new();
370
371    for event in &events {
372        match &event.payload {
373            EventPayload::FileWrite {
374                path,
375                blob_hash,
376                size_bytes,
377                content_type,
378            } => {
379                // Convert aios_protocol::BlobHash -> lago_core::BlobHash
380                manifest.apply_write(
381                    path.clone(),
382                    lago_core::BlobHash::from_hex(blob_hash.as_str()),
383                    *size_bytes,
384                    content_type.clone(),
385                    event.timestamp,
386                );
387            }
388            EventPayload::FileDelete { path } => {
389                manifest.apply_delete(path);
390            }
391            EventPayload::FileRename { old_path, new_path } => {
392                manifest.apply_rename(old_path, new_path.clone());
393            }
394            _ => {}
395        }
396    }
397
398    let entries: Vec<ManifestEntry> = manifest.entries().values().cloned().collect();
399
400    // Store in cache (write lock)
401    {
402        let mut cache = state.manifest_cache.write().await;
403        cache.insert(
404            cache_key,
405            CachedManifest {
406                entries: entries.clone(),
407                cached_at: Instant::now(),
408            },
409        );
410    }
411
412    debug!(
413        session_id = %session_id,
414        branch_id = %branch_id,
415        entry_count = entries.len(),
416        "manifest rebuilt and cached"
417    );
418
419    Ok(entries)
420}
421
422/// Invalidate the manifest cache for a given session + branch after a mutation.
423async fn invalidate_manifest_cache(
424    state: &Arc<AppState>,
425    session_id: &SessionId,
426    branch_id: &BranchId,
427) {
428    let cache_key = (session_id.to_string(), branch_id.to_string());
429    let mut cache = state.manifest_cache.write().await;
430    cache.remove(&cache_key);
431}
432
433/// Ensure the path starts with '/' for consistency.
434fn normalize_path(path: &str) -> String {
435    if path.starts_with('/') {
436        path.to_string()
437    } else {
438        format!("/{path}")
439    }
440}