Skip to main content

offline_intelligence/api/
attachment_api.rs

1//! Pre-extraction endpoint for file attachments.
2//!
3//! `POST /attachments/preprocess` fires background extraction tasks the moment
4//! a user attaches a file — so by the time they hit Send the content is already
5//! cached and injection is zero-overhead.
6//!
7//! Hardware-adaptive concurrency:
8//! - Plain-text formats (code, JSON, CSV, Markdown, …) bypass the semaphore —
9//!   they are just an `fs::read` + UTF-8 decode, essentially free.
10//! - Binary formats (PDF, DOCX, XLSX, PPTX, …) acquire a semaphore permit first.
11//!   The semaphore is sized to `num_cpus / 2` (min 1, max 8) so the LLM server
12//!   is never starved of CPU/RAM on low-spec hardware.
13
14use axum::{extract::State, http::StatusCode, Json};
15use serde::{Deserialize, Serialize};
16use tracing::{info, warn};
17
18use crate::api::stream_api::ChatAttachment;
19use crate::shared_state::{PreExtracted, UnifiedAppState};
20use crate::utils::{extract_content_from_bytes, is_extraction_sentinel};
21
22/// TTL for cached pre-extracted entries (30 minutes).
23/// The background eviction task checks every 5 minutes.
24pub const CACHE_TTL_SECS: u64 = 1_800;
25
26/// Build a stable, unique cache key for an attachment.
27pub fn attachment_cache_key(attach: &ChatAttachment) -> String {
28    match attach.source.as_str() {
29        "inline" => format!(
30            "inline:{}",
31            attach.file_path.as_deref().unwrap_or(&attach.name)
32        ),
33        "local_storage" => format!("local_storage:{}", attach.all_files_id.unwrap_or(0)),
34        other => format!("{}:{}", other, attach.name),
35    }
36}
37
38/// Returns `true` for formats that are pure text — just an `fs::read` + UTF-8
39/// decode, no binary parser, no meaningful CPU/RAM pressure.
40fn is_plain_text_format(name: &str) -> bool {
41    let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
42    matches!(
43        ext.as_str(),
44        "txt" | "md" | "markdown"
45            | "json" | "jsonl" | "csv" | "tsv"
46            | "yaml" | "yml" | "toml" | "xml" | "html" | "htm"
47            | "css" | "scss" | "sass" | "less"
48            | "rs" | "py" | "js" | "ts" | "jsx" | "tsx" | "mjs" | "cjs"
49            | "java" | "c" | "cpp" | "cc" | "cxx" | "h" | "hpp"
50            | "cs" | "go" | "php" | "rb" | "swift" | "kt" | "scala"
51            | "sql" | "sh" | "bash" | "zsh" | "fish" | "bat" | "ps1" | "psm1"
52            | "env" | "log" | "ini" | "cfg" | "conf" | "properties"
53            | "dockerfile" | "makefile" | "gitignore" | "gitattributes"
54            | "editorconfig" | "eslintrc" | "prettierrc"
55    )
56}
57
58#[derive(Debug, Deserialize)]
59pub struct PreprocessRequest {
60    pub attachments: Vec<ChatAttachment>,
61}
62
63#[derive(Debug, Serialize)]
64pub struct PreprocessResponse {
65    /// Files whose extraction was queued in the background.
66    pub queued: usize,
67    /// Files already in a fresh (non-stale) cache entry — no work needed.
68    pub already_cached: usize,
69}
70
71/// `POST /attachments/preprocess`
72///
73/// Accepts a list of file attachments and immediately spawns background
74/// extraction tasks — one per file. Returns before any extraction completes.
75///
76/// The next `/generate/stream` call for those same files will find the results
77/// in `shared_state.attachment_cache` and skip re-extraction entirely.
78pub async fn preprocess_attachments(
79    State(state): State<UnifiedAppState>,
80    Json(req): Json<PreprocessRequest>,
81) -> (StatusCode, Json<PreprocessResponse>) {
82    if req.attachments.is_empty() {
83        return (
84            StatusCode::OK,
85            Json(PreprocessResponse {
86                queued: 0,
87                already_cached: 0,
88            }),
89        );
90    }
91
92    let mut queued = 0usize;
93    let mut already_cached = 0usize;
94
95    for attach in req.attachments {
96        let key = attachment_cache_key(&attach);
97
98        // Skip if already cached and still fresh.
99        if let Some(entry) = state.shared_state.attachment_cache.get(&key) {
100            if !entry.is_stale(CACHE_TTL_SECS) {
101                already_cached += 1;
102                continue;
103            }
104            // Stale entry — fall through and re-queue.
105        }
106
107        queued += 1;
108        let state_clone = state.clone();
109        let attach_clone = attach.clone();
110        let key_clone = key.clone();
111        let is_plain = is_plain_text_format(&attach.name);
112
113        tokio::spawn(async move {
114            // Binary files acquire a semaphore permit before starting so that at
115            // most N heavy extractions run concurrently (N = num_cpus / 2, max 8).
116            // Plain-text files bypass the semaphore — they are I/O + decode only.
117            let _permit = if is_plain {
118                None
119            } else {
120                match state_clone
121                    .shared_state
122                    .extraction_semaphore
123                    .acquire()
124                    .await
125                {
126                    Ok(p) => Some(p),
127                    Err(_) => {
128                        warn!(
129                            "Extraction semaphore closed while pre-processing '{}'",
130                            attach_clone.name
131                        );
132                        return;
133                    }
134                }
135            };
136
137            match extract_for_cache(&attach_clone, &state_clone).await {
138                Ok(text) => {
139                    info!(
140                        "Pre-extracted '{}' ({} chars) → stored in cache",
141                        attach_clone.name,
142                        text.len()
143                    );
144                    state_clone.shared_state.attachment_cache.insert(
145                        key_clone,
146                        PreExtracted {
147                            text,
148                            extracted_at: std::time::Instant::now(),
149                        },
150                    );
151                }
152                Err(e) => {
153                    // Silently discard — the error surfaces again at Send time
154                    // via `try_extract_attachment`, which has proper user messages.
155                    warn!("Pre-extraction failed for '{}': {}", attach_clone.name, e);
156                }
157            }
158            // `_permit` is dropped here, releasing the semaphore slot.
159        });
160    }
161
162    info!(
163        "Attachment preprocess: {} queued, {} already cached",
164        queued, already_cached
165    );
166
167    (
168        StatusCode::OK,
169        Json(PreprocessResponse {
170            queued,
171            already_cached,
172        }),
173    )
174}
175
176/// Internal extraction helper for the preprocess path.
177/// Mirrors `try_extract_attachment` but returns `anyhow::Result<String>`
178/// (just the text) instead of the user-facing `Result<(String, String), String>`.
179async fn extract_for_cache(
180    attach: &ChatAttachment,
181    state: &UnifiedAppState,
182) -> anyhow::Result<String> {
183    match attach.source.as_str() {
184        "inline" => {
185            let path = attach
186                .file_path
187                .as_deref()
188                .ok_or_else(|| anyhow::anyhow!("no file_path on inline attachment"))?;
189            let bytes = tokio::fs::read(path)
190                .await
191                .map_err(|e| anyhow::anyhow!("fs::read '{}': {}", path, e))?;
192            let text = extract_content_from_bytes(&bytes, &attach.name)
193                .await
194                .map_err(|e| anyhow::anyhow!("extract '{}': {}", attach.name, e))?;
195            // Do not cache sentinel strings — let try_extract_attachment surface the
196            // proper user-facing error at Send time via HTTP 422.
197            if is_extraction_sentinel(&text) {
198                return Err(anyhow::anyhow!("extraction failed for '{}' — result is sentinel", attach.name));
199            }
200            Ok(text)
201        }
202
203        "local_storage" => {
204            let id = attach
205                .all_files_id
206                .ok_or_else(|| anyhow::anyhow!("no all_files_id on local_storage attachment"))?;
207            let bytes = state
208                .shared_state
209                .database_pool
210                .all_files
211                .get_file_bytes(id)
212                .map_err(|e| anyhow::anyhow!("db get_file_bytes id={}: {}", id, e))?;
213            let text = extract_content_from_bytes(&bytes, &attach.name)
214                .await
215                .map_err(|e| anyhow::anyhow!("extract '{}': {}", attach.name, e))?;
216            // Do not cache sentinel strings.
217            if is_extraction_sentinel(&text) {
218                return Err(anyhow::anyhow!("extraction failed for '{}' — result is sentinel", attach.name));
219            }
220            Ok(text)
221        }
222
223        other => Err(anyhow::anyhow!("unknown attachment source '{}'", other)),
224    }
225}