Skip to main content

lago_api/routes/
files.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use axum::Json;
5use axum::body::Bytes;
6use axum::extract::{Path, Query, State};
7use axum::http::StatusCode;
8use serde::{Deserialize, Serialize};
9
10use lago_core::event::{EventEnvelope, EventPayload};
11use lago_core::hashline::{HashLineEdit, HashLineFile};
12use lago_core::id::{BranchId, EventId, SessionId};
13use lago_core::{EventQuery, ManifestEntry};
14
15use crate::error::ApiError;
16use crate::state::AppState;
17
18// --- Response types
19
20#[derive(Serialize)]
21pub struct FileWriteResponse {
22    pub path: String,
23    pub blob_hash: String,
24    pub size_bytes: u64,
25}
26
27#[derive(Serialize)]
28pub struct ManifestResponse {
29    pub session_id: String,
30    pub entries: Vec<ManifestEntry>,
31}
32
33// --- Query types
34
35#[derive(Debug, Deserialize, Default)]
36pub struct FileReadQuery {
37    /// Optional format: "hashline" returns content in hashline format.
38    #[serde(default)]
39    pub format: Option<String>,
40    /// Branch to read from (default: main).
41    #[serde(default = "default_branch")]
42    pub branch: String,
43}
44
45#[derive(Debug, Deserialize, Default)]
46pub struct BranchQuery {
47    /// Branch to operate on (default: main).
48    #[serde(default = "default_branch")]
49    pub branch: String,
50}
51
52fn default_branch() -> String {
53    "main".to_string()
54}
55
56// --- Handlers
57
58/// GET /v1/sessions/:id/files/*path
59///
60/// Reads a file from the session's virtual filesystem by replaying the
61/// manifest from journal events and fetching the blob from the store.
62///
63/// Supports `?format=hashline` to return content in hashline format
64/// (`N:HHHH|content` per line) with `x-format: hashline` header.
65pub async fn read_file(
66    State(state): State<Arc<AppState>>,
67    Path((session_id, file_path)): Path<(String, String)>,
68    Query(query): Query<FileReadQuery>,
69) -> Result<axum::http::Response<axum::body::Body>, ApiError> {
70    let session_id = SessionId::from_string(session_id.clone());
71    let branch_id = BranchId::from_string(query.branch.clone());
72    let file_path = normalize_path(&file_path);
73
74    // Verify session exists
75    state
76        .journal
77        .get_session(&session_id)
78        .await?
79        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
80
81    // Build manifest from events
82    let manifest = build_manifest(&state, &session_id, &branch_id).await?;
83
84    let entry = manifest
85        .iter()
86        .find(|e| e.path == file_path)
87        .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
88
89    let data = state
90        .blob_store
91        .get(&entry.blob_hash)
92        .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
93
94    // If hashline format requested, convert
95    if query.format.as_deref() == Some("hashline") {
96        let text = String::from_utf8(data)
97            .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
98        let hashline_file = HashLineFile::from_content(&text);
99        let hashline_text = hashline_file.to_hashline_text();
100
101        return Ok(axum::http::Response::builder()
102            .status(StatusCode::OK)
103            .header("content-type", "text/plain")
104            .header("x-format", "hashline")
105            .header("x-blob-hash", entry.blob_hash.as_str())
106            .body(axum::body::Body::from(hashline_text))
107            .unwrap());
108    }
109
110    let content_type = entry
111        .content_type
112        .clone()
113        .unwrap_or_else(|| "application/octet-stream".to_string());
114
115    Ok(axum::http::Response::builder()
116        .status(StatusCode::OK)
117        .header("content-type", content_type)
118        .header("x-blob-hash", entry.blob_hash.as_str())
119        .body(axum::body::Body::from(data))
120        .unwrap())
121}
122
123/// PATCH /v1/sessions/:id/files/*path
124///
125/// Applies hashline edits to a file. Accepts a JSON array of `HashLineEdit`
126/// operations, applies them to the current file content, stores the result
127/// as a new blob, and emits a `FileWrite` event.
128pub async fn patch_file(
129    State(state): State<Arc<AppState>>,
130    Path((session_id, file_path)): Path<(String, String)>,
131    Query(branch): Query<BranchQuery>,
132    Json(edits): Json<Vec<HashLineEdit>>,
133) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
134    let session_id = SessionId::from_string(session_id.clone());
135    let branch_id = BranchId::from_string(branch.branch);
136    let file_path = normalize_path(&file_path);
137
138    // Verify session exists
139    state
140        .journal
141        .get_session(&session_id)
142        .await?
143        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
144
145    // Build manifest and read current file
146    let manifest = build_manifest(&state, &session_id, &branch_id).await?;
147    let entry = manifest
148        .iter()
149        .find(|e| e.path == file_path)
150        .ok_or_else(|| ApiError::NotFound(format!("file not found: {file_path}")))?;
151
152    let data = state
153        .blob_store
154        .get(&entry.blob_hash)
155        .map_err(|e| ApiError::Internal(format!("failed to read blob: {e}")))?;
156
157    let text = String::from_utf8(data)
158        .map_err(|_| ApiError::BadRequest("file is not valid UTF-8".to_string()))?;
159
160    // Apply hashline edits
161    let hashline_file = HashLineFile::from_content(&text);
162    let new_content = hashline_file
163        .apply_edits(&edits)
164        .map_err(lago_core::LagoError::from)?;
165
166    // Store new blob
167    let blob_hash = state
168        .blob_store
169        .put(new_content.as_bytes())
170        .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
171
172    let size_bytes = new_content.len() as u64;
173
174    // Emit a FileWrite event
175    let event = EventEnvelope {
176        event_id: EventId::new(),
177        session_id: session_id.clone(),
178        branch_id,
179        run_id: None,
180        seq: 0,
181        timestamp: EventEnvelope::now_micros(),
182        parent_id: None,
183        payload: EventPayload::FileWrite {
184            path: file_path.clone(),
185            blob_hash: blob_hash.clone().into(),
186            size_bytes,
187            content_type: None,
188        },
189        metadata: HashMap::new(),
190        schema_version: 1,
191    };
192
193    state.journal.append(event).await?;
194
195    Ok((
196        StatusCode::OK,
197        Json(FileWriteResponse {
198            path: file_path,
199            blob_hash: blob_hash.to_string(),
200            size_bytes,
201        }),
202    ))
203}
204
205/// PUT /v1/sessions/:id/files/*path
206///
207/// Writes a file to the session's virtual filesystem. The file contents are
208/// stored as a blob and a `FileWrite` event is appended to the journal.
209pub async fn write_file(
210    State(state): State<Arc<AppState>>,
211    Path((session_id, file_path)): Path<(String, String)>,
212    Query(branch): Query<BranchQuery>,
213    body: Bytes,
214) -> Result<(StatusCode, Json<FileWriteResponse>), ApiError> {
215    let session_id = SessionId::from_string(session_id.clone());
216    let branch_id = BranchId::from_string(branch.branch);
217    let file_path = normalize_path(&file_path);
218
219    // Verify session exists
220    state
221        .journal
222        .get_session(&session_id)
223        .await?
224        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
225
226    // Store the blob
227    let blob_hash = state
228        .blob_store
229        .put(&body)
230        .map_err(|e| ApiError::Internal(format!("failed to store blob: {e}")))?;
231
232    let size_bytes = body.len() as u64;
233
234    // Emit a FileWrite event
235    let event = EventEnvelope {
236        event_id: EventId::new(),
237        session_id: session_id.clone(),
238        branch_id,
239        run_id: None,
240        seq: 0,
241        timestamp: EventEnvelope::now_micros(),
242        parent_id: None,
243        payload: EventPayload::FileWrite {
244            path: file_path.clone(),
245            blob_hash: blob_hash.clone().into(),
246            size_bytes,
247            content_type: None,
248        },
249        metadata: HashMap::new(),
250        schema_version: 1,
251    };
252
253    state.journal.append(event).await?;
254
255    Ok((
256        StatusCode::CREATED,
257        Json(FileWriteResponse {
258            path: file_path,
259            blob_hash: blob_hash.to_string(),
260            size_bytes,
261        }),
262    ))
263}
264
265/// DELETE /v1/sessions/:id/files/*path
266///
267/// Removes a file from the session's virtual filesystem by appending a
268/// `FileDelete` event.
269pub async fn delete_file(
270    State(state): State<Arc<AppState>>,
271    Path((session_id, file_path)): Path<(String, String)>,
272    Query(branch): Query<BranchQuery>,
273) -> Result<StatusCode, ApiError> {
274    let session_id = SessionId::from_string(session_id.clone());
275    let branch_id = BranchId::from_string(branch.branch);
276    let file_path = normalize_path(&file_path);
277
278    // Verify session exists
279    state
280        .journal
281        .get_session(&session_id)
282        .await?
283        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
284
285    let event = EventEnvelope {
286        event_id: EventId::new(),
287        session_id: session_id.clone(),
288        branch_id,
289        run_id: None,
290        seq: 0,
291        timestamp: EventEnvelope::now_micros(),
292        parent_id: None,
293        payload: EventPayload::FileDelete {
294            path: file_path.clone(),
295        },
296        metadata: HashMap::new(),
297        schema_version: 1,
298    };
299
300    state.journal.append(event).await?;
301
302    Ok(StatusCode::NO_CONTENT)
303}
304
305/// GET /v1/sessions/:id/manifest
306///
307/// Returns the full manifest (list of all files) for a session by replaying
308/// file events from the journal.
309pub async fn get_manifest(
310    State(state): State<Arc<AppState>>,
311    Path(session_id): Path<String>,
312    Query(branch): Query<BranchQuery>,
313) -> Result<Json<ManifestResponse>, ApiError> {
314    let session_id = SessionId::from_string(session_id.clone());
315    let branch_id = BranchId::from_string(branch.branch);
316
317    // Verify session exists
318    state
319        .journal
320        .get_session(&session_id)
321        .await?
322        .ok_or_else(|| ApiError::NotFound(format!("session not found: {session_id}")))?;
323
324    let entries = build_manifest(&state, &session_id, &branch_id).await?;
325
326    Ok(Json(ManifestResponse {
327        session_id: session_id.to_string(),
328        entries,
329    }))
330}
331
332// --- Internal helpers
333
334/// Build a manifest by replaying file events from the journal.
335async fn build_manifest(
336    state: &Arc<AppState>,
337    session_id: &SessionId,
338    branch_id: &BranchId,
339) -> Result<Vec<ManifestEntry>, ApiError> {
340    let query = EventQuery::new()
341        .session(session_id.clone())
342        .branch(branch_id.clone());
343    let events = state.journal.read(query).await?;
344
345    let mut manifest = lago_fs::Manifest::new();
346
347    for event in &events {
348        match &event.payload {
349            EventPayload::FileWrite {
350                path,
351                blob_hash,
352                size_bytes,
353                content_type,
354            } => {
355                // Convert aios_protocol::BlobHash -> lago_core::BlobHash
356                manifest.apply_write(
357                    path.clone(),
358                    lago_core::BlobHash::from_hex(blob_hash.as_str()),
359                    *size_bytes,
360                    content_type.clone(),
361                    event.timestamp,
362                );
363            }
364            EventPayload::FileDelete { path } => {
365                manifest.apply_delete(path);
366            }
367            EventPayload::FileRename { old_path, new_path } => {
368                manifest.apply_rename(old_path, new_path.clone());
369            }
370            _ => {}
371        }
372    }
373
374    let entries: Vec<ManifestEntry> = manifest.entries().values().cloned().collect();
375
376    Ok(entries)
377}
378
379/// Ensure the path starts with '/' for consistency.
380fn normalize_path(path: &str) -> String {
381    if path.starts_with('/') {
382        path.to_string()
383    } else {
384        format!("/{path}")
385    }
386}