Skip to main content

lago_api/routes/
files.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use axum::Json;
5use axum::body::Bytes;
6use axum::extract::{Path, Query, State};
7use axum::http::StatusCode;
8use serde::{Deserialize, Serialize};
9
10use lago_core::event::{EventEnvelope, EventPayload};
11use lago_core::hashline::{HashLineEdit, HashLineFile};
12use lago_core::id::{BranchId, EventId, SessionId};
13use lago_core::{EventQuery, ManifestEntry};
14
15use crate::error::ApiError;
16use crate::state::AppState;
17
18// --- Response types
19
20#[derive(Serialize)]
21pub struct FileWriteResponse {
22    pub path: String,
23    pub blob_hash: String,
24    pub size_bytes: u64,
25}
26
27#[derive(Serialize)]
28pub struct ManifestResponse {
29    pub session_id: String,
30    pub entries: Vec<ManifestEntry>,
31}
32
33// --- Query types
34
35#[derive(Debug, Deserialize, Default)]
36pub struct FileReadQuery {
37    /// Optional format: "hashline" returns content in hashline format.
38    #[serde(default)]
39    pub format: Option<String>,
40}
41
42// --- Handlers
43
44/// GET /v1/sessions/:id/files/*path
45///
46/// Reads a file from the session's virtual filesystem by replaying the
47/// manifest from journal events and fetching the blob from the store.
48///
49/// Supports `?format=hashline` to return content in hashline format
50/// (`N:HHHH|content` per line) with `x-format: hashline` header.
51pub async fn read_file(
52    State(state): State<Arc<AppState>>,
53    Path((session_id, file_path)): Path<(String, String)>,
54    Query(query): Query<FileReadQuery>,
55) -> Result<axum::http::Response<axum::body::Body>, ApiError> {
56    let session_id = SessionId::from_string(session_id.clone());
57    let file_path = normalize_path(&file_path);
58
59    // Verify session exists
60    state
61        .journal
62        .get_session(&session_id)
63        .await?
64        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
65
66    // Build manifest from events
67    let manifest = build_manifest(&state, &session_id).await?;
68
69    let entry = manifest
70        .iter()
71        .find(|e| e.path == file_path)
72        .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
73
74    let data = state
75        .blob_store
76        .get(&entry.blob_hash)
77        .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
78
79    // If hashline format requested, convert
80    if query.format.as_deref() == Some("hashline") {
81        let text = String::from_utf8(data)
82            .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
83        let hashline_file = HashLineFile::from_content(&text);
84        let hashline_text = hashline_file.to_hashline_text();
85
86        return Ok(axum::http::Response::builder()
87            .status(StatusCode::OK)
88            .header("content-type", "text/plain")
89            .header("x-format", "hashline")
90            .header("x-blob-hash", entry.blob_hash.as_str())
91            .body(axum::body::Body::from(hashline_text))
92            .unwrap());
93    }
94
95    let content_type = entry
96        .content_type
97        .clone()
98        .unwrap_or_else(|| "application/octet-stream".to_string());
99
100    Ok(axum::http::Response::builder()
101        .status(StatusCode::OK)
102        .header("content-type", content_type)
103        .header("x-blob-hash", entry.blob_hash.as_str())
104        .body(axum::body::Body::from(data))
105        .unwrap())
106}
107
108/// PATCH /v1/sessions/:id/files/*path
109///
110/// Applies hashline edits to a file. Accepts a JSON array of `HashLineEdit`
111/// operations, applies them to the current file content, stores the result
112/// as a new blob, and emits a `FileWrite` event.
113pub async fn patch_file(
114    State(state): State<Arc<AppState>>,
115    Path((session_id, file_path)): Path<(String, String)>,
116    Json(edits): Json<Vec<HashLineEdit>>,
117) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
118    let session_id = SessionId::from_string(session_id.clone());
119    let file_path = normalize_path(&file_path);
120
121    // Verify session exists
122    state
123        .journal
124        .get_session(&session_id)
125        .await?
126        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
127
128    // Build manifest and read current file
129    let manifest = build_manifest(&state, &session_id).await?;
130    let entry = manifest
131        .iter()
132        .find(|e| e.path == file_path)
133        .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
134
135    let data = state
136        .blob_store
137        .get(&entry.blob_hash)
138        .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
139
140    let text = String::from_utf8(data)
141        .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
142
143    // Apply hashline edits
144    let hashline_file = HashLineFile::from_content(&text);
145    let new_content = hashline_file
146        .apply_edits(&edits)
147        .map_err(lago_core::LagoError::from)?;
148
149    // Store new blob
150    let blob_hash = state
151        .blob_store
152        .put(new_content.as_bytes())
153        .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
154
155    let size_bytes = new_content.len() as u64;
156    let branch_id = BranchId::from_string("main");
157    let seq = next_seq(&state, &session_id, &branch_id).await?;
158
159    // Emit a FileWrite event
160    let event = EventEnvelope {
161        event_id: EventId::new(),
162        session_id: session_id.clone(),
163        branch_id,
164        run_id: None,
165        seq,
166        timestamp: EventEnvelope::now_micros(),
167        parent_id: None,
168        payload: EventPayload::FileWrite {
169            path: file_path.clone(),
170            blob_hash: blob_hash.clone(),
171            size_bytes,
172            content_type: None,
173        },
174        metadata: HashMap::new(),
175    };
176
177    state.journal.append(event).await?;
178
179    Ok((
180        StatusCode::OK,
181        Json(FileWriteResponse {
182            path: file_path,
183            blob_hash: blob_hash.to_string(),
184            size_bytes,
185        }),
186    ))
187}
188
189/// PUT /v1/sessions/:id/files/*path
190///
191/// Writes a file to the session's virtual filesystem. The file contents are
192/// stored as a blob and a `FileWrite` event is appended to the journal.
193pub async fn write_file(
194    State(state): State<Arc<AppState>>,
195    Path((session_id, file_path)): Path<(String, String)>,
196    body: Bytes,
197) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
198    let session_id = SessionId::from_string(session_id.clone());
199    let file_path = normalize_path(&file_path);
200
201    // Verify session exists
202    state
203        .journal
204        .get_session(&session_id)
205        .await?
206        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
207
208    // Store the blob
209    let blob_hash = state
210        .blob_store
211        .put(&body)
212        .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
213
214    let size_bytes = body.len() as u64;
215    let branch_id = BranchId::from_string("main");
216    let seq = next_seq(&state, &session_id, &branch_id).await?;
217
218    // Emit a FileWrite event
219    let event = EventEnvelope {
220        event_id: EventId::new(),
221        session_id: session_id.clone(),
222        branch_id,
223        run_id: None,
224        seq,
225        timestamp: EventEnvelope::now_micros(),
226        parent_id: None,
227        payload: EventPayload::FileWrite {
228            path: file_path.clone(),
229            blob_hash: blob_hash.clone(),
230            size_bytes,
231            content_type: None,
232        },
233        metadata: HashMap::new(),
234    };
235
236    state.journal.append(event).await?;
237
238    Ok((
239        StatusCode::CREATED,
240        Json(FileWriteResponse {
241            path: file_path,
242            blob_hash: blob_hash.to_string(),
243            size_bytes,
244        }),
245    ))
246}
247
248/// DELETE /v1/sessions/:id/files/*path
249///
250/// Removes a file from the session's virtual filesystem by appending a
251/// `FileDelete` event.
252pub async fn delete_file(
253    State(state): State<Arc<AppState>>,
254    Path((session_id, file_path)): Path<(String, String)>,
255) -> Result<StatusCode, ApiError> {
256    let session_id = SessionId::from_string(session_id.clone());
257    let file_path = normalize_path(&file_path);
258
259    // Verify session exists
260    state
261        .journal
262        .get_session(&session_id)
263        .await?
264        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
265
266    let branch_id = BranchId::from_string("main");
267    let seq = next_seq(&state, &session_id, &branch_id).await?;
268
269    let event = EventEnvelope {
270        event_id: EventId::new(),
271        session_id: session_id.clone(),
272        branch_id,
273        run_id: None,
274        seq,
275        timestamp: EventEnvelope::now_micros(),
276        parent_id: None,
277        payload: EventPayload::FileDelete {
278            path: file_path.clone(),
279        },
280        metadata: HashMap::new(),
281    };
282
283    state.journal.append(event).await?;
284
285    Ok(StatusCode::NO_CONTENT)
286}
287
288/// GET /v1/sessions/:id/manifest
289///
290/// Returns the full manifest (list of all files) for a session by replaying
291/// file events from the journal.
292pub async fn get_manifest(
293    State(state): State<Arc<AppState>>,
294    Path(session_id): Path<String>,
295) -> Result<Json<ManifestResponse>, ApiError> {
296    let session_id = SessionId::from_string(session_id.clone());
297
298    // Verify session exists
299    state
300        .journal
301        .get_session(&session_id)
302        .await?
303        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
304
305    let entries = build_manifest(&state, &session_id).await?;
306
307    Ok(Json(ManifestResponse {
308        session_id: session_id.to_string(),
309        entries,
310    }))
311}
312
313// --- Internal helpers
314
315/// Get the next sequence number for a session+branch.
316async fn next_seq(
317    state: &Arc<AppState>,
318    session_id: &SessionId,
319    branch_id: &BranchId,
320) -> Result<u64, ApiError> {
321    let head = state.journal.head_seq(session_id, branch_id).await?;
322    Ok(head + 1)
323}
324
325/// Build a manifest by replaying file events from the journal.
326async fn build_manifest(
327    state: &Arc<AppState>,
328    session_id: &SessionId,
329) -> Result<Vec<ManifestEntry>, ApiError> {
330    let query = EventQuery::new().session(session_id.clone());
331    let events = state.journal.read(query).await?;
332
333    let mut manifest = lago_fs::Manifest::new();
334
335    for event in &events {
336        match &event.payload {
337            EventPayload::FileWrite {
338                path,
339                blob_hash,
340                size_bytes,
341                content_type,
342            } => {
343                manifest.apply_write(
344                    path.clone(),
345                    blob_hash.clone(),
346                    *size_bytes,
347                    content_type.clone(),
348                    event.timestamp,
349                );
350            }
351            EventPayload::FileDelete { path } => {
352                manifest.apply_delete(path);
353            }
354            EventPayload::FileRename { old_path, new_path } => {
355                manifest.apply_rename(old_path, new_path.clone());
356            }
357            _ => {}
358        }
359    }
360
361    let entries: Vec<ManifestEntry> = manifest.entries().values().cloned().collect();
362
363    Ok(entries)
364}
365
366/// Ensure the path starts with '/' for consistency.
367fn normalize_path(path: &str) -> String {
368    if path.starts_with('/') {
369        path.to_string()
370    } else {
371        format!("/{path}")
372    }
373}