Skip to main content

batuta/serve/banco/
handlers_data.rs

1//! Data management handlers — file upload, list, delete.
2
3use axum::{
4    extract::{Multipart, State},
5    http::StatusCode,
6    response::Json,
7};
8
9use super::state::BancoState;
10use super::storage::FileInfo;
11use super::types::ErrorResponse;
12
13/// POST /api/v1/data/upload — upload files via multipart form.
14pub async fn upload_handler(
15    State(state): State<BancoState>,
16    mut multipart: Multipart,
17) -> Result<Json<Vec<FileInfo>>, (StatusCode, Json<ErrorResponse>)> {
18    let mut uploaded = Vec::new();
19
20    while let Ok(Some(field)) = multipart.next_field().await {
21        let name = field.file_name().unwrap_or("unnamed").to_string();
22        let data = field.bytes().await.map_err(|e| {
23            (
24                StatusCode::BAD_REQUEST,
25                Json(ErrorResponse::new(format!("Failed to read field: {e}"), "upload_error", 400)),
26            )
27        })?;
28
29        if data.is_empty() {
30            continue;
31        }
32
33        let info = state.files.store(&name, &data);
34        // Auto-index for RAG
35        let text = String::from_utf8_lossy(&data);
36        state.rag.index_document(&info.id, &info.name, &text);
37        // Emit event
38        state.events.emit(&super::events::BancoEvent::FileUploaded {
39            file_id: info.id.clone(),
40            name: info.name.clone(),
41        });
42        uploaded.push(info);
43    }
44
45    if uploaded.is_empty() {
46        return Err((
47            StatusCode::BAD_REQUEST,
48            Json(ErrorResponse::new("No files uploaded", "no_files", 400)),
49        ));
50    }
51
52    Ok(Json(uploaded))
53}
54
55/// POST /api/v1/data/upload/json — upload a file via JSON body (simpler for testing).
56pub async fn upload_json_handler(
57    State(state): State<BancoState>,
58    Json(request): Json<UploadJsonRequest>,
59) -> Json<FileInfo> {
60    let info = state.files.store(&request.name, request.content.as_bytes());
61    // Auto-index for RAG
62    state.rag.index_document(&info.id, &info.name, &request.content);
63    // Emit event
64    state.events.emit(&super::events::BancoEvent::FileUploaded {
65        file_id: info.id.clone(),
66        name: info.name.clone(),
67    });
68    Json(info)
69}
70
71/// JSON upload request (alternative to multipart).
72#[derive(Debug, serde::Deserialize)]
73pub struct UploadJsonRequest {
74    pub name: String,
75    pub content: String,
76}
77
78/// GET /api/v1/data/files — list all uploaded files.
79pub async fn list_files_handler(State(state): State<BancoState>) -> Json<FilesListResponse> {
80    Json(FilesListResponse { files: state.files.list() })
81}
82
83/// DELETE /api/v1/data/files/:id — delete an uploaded file.
84pub async fn delete_file_handler(
85    State(state): State<BancoState>,
86    axum::extract::Path(id): axum::extract::Path<String>,
87) -> Result<StatusCode, (StatusCode, Json<ErrorResponse>)> {
88    state.files.delete(&id).map(|()| StatusCode::NO_CONTENT).map_err(|_| {
89        (
90            StatusCode::NOT_FOUND,
91            Json(ErrorResponse::new(format!("File {id} not found"), "not_found", 404)),
92        )
93    })
94}
95
96/// GET /api/v1/data/files/:id/info — file details + schema (for structured files).
97pub async fn file_info_handler(
98    State(state): State<BancoState>,
99    axum::extract::Path(id): axum::extract::Path<String>,
100) -> Result<Json<FileInfoDetail>, (StatusCode, Json<ErrorResponse>)> {
101    let info = state.files.get(&id).ok_or((
102        StatusCode::NOT_FOUND,
103        Json(ErrorResponse::new(format!("File {id} not found"), "not_found", 404)),
104    ))?;
105
106    let content = state.files.read_content(&id);
107    let preview_lines: Vec<String> = content
108        .as_ref()
109        .map(|bytes| String::from_utf8_lossy(bytes).lines().take(5).map(String::from).collect())
110        .unwrap_or_default();
111
112    let schema = detect_schema(&info.content_type, content.as_deref());
113
114    Ok(Json(FileInfoDetail { info, preview_lines, schema }))
115}
116
117/// Detect schema for structured file types.
118#[cfg(feature = "alimentar")]
119fn detect_schema(content_type: &str, content: Option<&[u8]>) -> Option<Vec<SchemaField>> {
120    use alimentar::{ArrowDataset, Dataset};
121
122    let bytes = content?;
123    let text = std::str::from_utf8(bytes).ok()?;
124
125    let dataset = match content_type {
126        "text/csv" => ArrowDataset::from_csv_str(text).ok()?,
127        "application/json" | "application/jsonl" => ArrowDataset::from_json_str(text).ok()?,
128        _ => return None,
129    };
130
131    let schema = dataset.schema();
132    Some(
133        schema
134            .fields()
135            .iter()
136            .map(|f| SchemaField {
137                name: f.name().clone(),
138                data_type: format!("{:?}", f.data_type()),
139                nullable: f.is_nullable(),
140            })
141            .collect(),
142    )
143}
144
145/// Schema detection fallback (no alimentar).
146#[cfg(not(feature = "alimentar"))]
147fn detect_schema(content_type: &str, content: Option<&[u8]>) -> Option<Vec<SchemaField>> {
148    let bytes = content?;
149    let text = std::str::from_utf8(bytes).ok()?;
150
151    match content_type {
152        "text/csv" => {
153            let header = text.lines().next()?;
154            Some(
155                header
156                    .split(',')
157                    .map(|col| SchemaField {
158                        name: col.trim().to_string(),
159                        data_type: "Utf8".to_string(),
160                        nullable: true,
161                    })
162                    .collect(),
163            )
164        }
165        _ => None,
166    }
167}
168
169/// File list response.
170#[derive(Debug, serde::Serialize)]
171pub struct FilesListResponse {
172    pub files: Vec<FileInfo>,
173}
174
175/// Detailed file info with preview and schema.
176#[derive(Debug, serde::Serialize)]
177pub struct FileInfoDetail {
178    #[serde(flatten)]
179    pub info: FileInfo,
180    pub preview_lines: Vec<String>,
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub schema: Option<Vec<SchemaField>>,
183}
184
185/// Schema field descriptor.
186#[derive(Debug, Clone, serde::Serialize)]
187pub struct SchemaField {
188    pub name: String,
189    pub data_type: String,
190    pub nullable: bool,
191}