mnem-http 0.1.6

HTTP JSON API for mnem - REST surface over the core repo operations.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
//! `POST /v1/ingest` - upload a source file (or JSON body with inline
//! text) and commit the resulting Doc + Chunk + Entity subgraph.
//!
//! Phase-B5d, HTTP half. Accepts one of two request shapes:
//!
//! - **`multipart/form-data`**: a `file` field carrying the raw source
//!   bytes, plus optional `ntype`, `chunker`, `max_tokens`, `overlap`
//!   text fields.
//! - **JSON body**: `{"text": "...", "ntype": "...", ...}` for
//!   callers that already have the payload in memory (typical for
//!   agent-driven ingests that don't want to round-trip through a
//!   temp file).
//!
//! The multipart shape is the primary surface; the JSON shape exists
//! so a pure-REST client (e.g. Postman snippet in a demo) can exercise
//! the path without a separate file upload.
//!
//! ## Size + tokens clamp
//!
//! File size is capped at 32 MiB (`MNEM_HTTP_INGEST_MAX_BYTES`),
//! `max_tokens` at 8192. Both mirror the MCP handler so a request
//! that migrates between transports sees the same ceiling. The cap
//! is a DoS guardrail, not a product shape; raise it in env if you
//! have a legitimate use case.

use std::time::Instant;

use axum::Json;
use axum::extract::{Multipart, State};
use mnem_ingest::{
    ChunkerAuto, ChunkerKind, IngestConfig, Ingester, NerConfig, SourceKind, auto_chunker,
};
use serde::Deserialize;
use serde_json::{Value, json};

use crate::error::Error;
use crate::state::AppState;

/// Upper bound on `max_tokens` accepted on `/v1/ingest`. Mirrors the
/// CLI + MCP ceilings.
pub(crate) const MAX_INGEST_TOKENS: u32 = 8192;

/// Default `MNEM_HTTP_INGEST_MAX_BYTES` cap on the raw file size.
/// 32 MiB covers typical Markdown / conversation exports / mid-sized
/// PDFs; operators hit the env override before the product shape ever
/// needs to change.
pub(crate) const DEFAULT_MAX_INGEST_BYTES: u64 = 32 * 1024 * 1024;

/// Resolve the per-request file-size cap. Reads `MNEM_HTTP_INGEST_MAX_BYTES`
/// on every request so an operator can raise the ceiling without a
/// restart; failure to parse falls back to [`DEFAULT_MAX_INGEST_BYTES`].
fn max_ingest_bytes() -> u64 {
    std::env::var("MNEM_HTTP_INGEST_MAX_BYTES")
        .ok()
        .and_then(|s| s.parse::<u64>().ok())
        .unwrap_or(DEFAULT_MAX_INGEST_BYTES)
}

/// JSON body for the non-multipart shape. Callers that already have
/// the payload in memory POST it under `Content-Type: application/json`
/// instead of a file upload.
#[derive(Deserialize, Debug)]
pub(crate) struct IngestJsonBody {
    /// UTF-8 text of the source. Interpreted as `SourceKind::Text`
    /// unless `kind` is set.
    pub text: String,
    /// Optional explicit source kind (`"markdown" | "text" | "pdf" |
    /// "conversation"`). Defaults to `text`.
    #[serde(default)]
    pub kind: Option<String>,
    /// Override the Doc root `ntype`. Defaults to `"Doc"`.
    #[serde(default)]
    pub ntype: Option<String>,
    /// `auto | paragraph | recursive | session`. Defaults to `auto`.
    #[serde(default)]
    pub chunker: Option<String>,
    /// Target tokens per chunk. Clamped at [`MAX_INGEST_TOKENS`].
    #[serde(default)]
    pub max_tokens: Option<u32>,
    /// Overlap tokens between adjacent chunks (recursive chunker).
    #[serde(default)]
    pub overlap: Option<u32>,
    /// Commit author. Required.
    pub author: String,
    /// Commit message. Optional; default `"mnem http ingest"`.
    #[serde(default)]
    pub message: Option<String>,
    /// Extractor selector. `"none"` (default) keeps the rule-based
    /// [`mnem_ingest::RuleExtractor`]. `"keybert"` swaps in the
    /// statistical adapter, driven by the server's configured
    /// embedder. C3 FIX-3.
    #[serde(default)]
    pub extractor: Option<String>,
    /// NER provider. `"rule"` (default) uses the capitalized-phrase
    /// heuristic. `"none"` suppresses all entity extraction.
    /// Overrides the server's configured `[ner]` section for this
    /// request only.
    #[serde(default)]
    pub ner_provider: Option<String>,
}

/// `POST /v1/ingest` dispatcher. Detects the request's
/// `Content-Type`; multipart goes through [`ingest_multipart`], JSON
/// through [`ingest_json`]. Unrecognised content types fall back to
/// multipart parsing (which will surface a clear error on non-multipart
/// bodies).
pub(crate) async fn ingest(
    State(state): State<AppState>,
    multipart_or_json: axum::http::Request<axum::body::Body>,
) -> Result<Json<Value>, Error> {
    let content_type = multipart_or_json
        .headers()
        .get(axum::http::header::CONTENT_TYPE)
        .and_then(|v| v.to_str().ok())
        .unwrap_or("")
        .to_ascii_lowercase();

    if content_type.starts_with("application/json") {
        let bytes = axum::body::to_bytes(multipart_or_json.into_body(), usize::MAX)
            .await
            .map_err(|e| Error::bad_request(format!("reading body: {e}")))?;
        let body: IngestJsonBody = serde_json::from_slice(&bytes)
            .map_err(|e| Error::bad_request(format!("malformed JSON body: {e}")))?;
        ingest_json(state, body).await
    } else {
        // Every other content-type path (multipart/form-data with
        // boundary, empty, anything else) falls through to the
        // multipart parser, which returns a clean 400 on a malformed
        // body.
        let multipart = Multipart::from_request(multipart_or_json, &state)
            .await
            .map_err(|e| Error::bad_request(format!("multipart decode: {e}")))?;
        ingest_multipart(state, multipart).await
    }
}

/// Multipart variant. Expects a `file` field plus optional text fields.
async fn ingest_multipart(state: AppState, mut multipart: Multipart) -> Result<Json<Value>, Error> {
    let mut file_bytes: Option<Vec<u8>> = None;
    let mut file_name: Option<String> = None;
    let mut ntype: Option<String> = None;
    let mut chunker_str: Option<String> = None;
    let mut max_tokens: Option<u32> = None;
    let mut overlap: Option<u32> = None;
    let mut author: Option<String> = None;
    let mut message: Option<String> = None;
    let mut extractor: Option<String> = None;
    let mut ner_provider: Option<String> = None;

    let max_bytes = max_ingest_bytes();

    while let Some(field) = multipart
        .next_field()
        .await
        .map_err(|e| Error::bad_request(format!("multipart field: {e}")))?
    {
        let name = field.name().unwrap_or("").to_string();
        match name.as_str() {
            "file" => {
                file_name = field.file_name().map(ToString::to_string);
                let data = field
                    .bytes()
                    .await
                    .map_err(|e| Error::bad_request(format!("reading file field: {e}")))?;
                if data.len() as u64 > max_bytes {
                    return Err(Error::bad_request(format!(
                        "file field is {} bytes; exceeds the {max_bytes}-byte cap \
                         (raise MNEM_HTTP_INGEST_MAX_BYTES if legitimate)",
                        data.len()
                    )));
                }
                file_bytes = Some(data.to_vec());
            }
            "ntype" => ntype = Some(field_text(field).await?),
            "chunker" => chunker_str = Some(field_text(field).await?),
            "max_tokens" => {
                let s = field_text(field).await?;
                max_tokens = Some(
                    s.parse::<u32>()
                        .map_err(|e| Error::bad_request(format!("max_tokens: {e}")))?,
                );
            }
            "overlap" => {
                let s = field_text(field).await?;
                overlap = Some(
                    s.parse::<u32>()
                        .map_err(|e| Error::bad_request(format!("overlap: {e}")))?,
                );
            }
            "author" => author = Some(field_text(field).await?),
            "message" => message = Some(field_text(field).await?),
            "extractor" => extractor = Some(field_text(field).await?),
            "ner_provider" => ner_provider = Some(field_text(field).await?),
            other => {
                // Ignore unknown fields rather than 400: clients that
                // add forward-compatible metadata (trace_id,
                // client_version) shouldn't break here.
                tracing::debug!(field = %other, "ignoring unknown multipart field on /v1/ingest");
                let _ = field.bytes().await;
            }
        }
    }

    let bytes =
        file_bytes.ok_or_else(|| Error::bad_request("missing `file` field in multipart body"))?;
    let kind = file_name.as_ref().map_or(SourceKind::Text, |n| {
        Ingester::source_kind_for_path(std::path::Path::new(n))
    });
    let author =
        author.ok_or_else(|| Error::bad_request("missing `author` field in multipart body"))?;
    run_ingest(
        &state,
        &bytes,
        kind,
        IngestParams {
            ntype: ntype.unwrap_or_else(|| "Doc".into()),
            chunker: chunker_str.unwrap_or_else(|| "auto".into()),
            max_tokens: max_tokens.unwrap_or(512),
            overlap: overlap.unwrap_or(32),
            author,
            message: message.unwrap_or_else(|| "mnem http ingest".into()),
            extractor,
            ner_provider,
        },
    )
}

/// JSON variant.
///
/// `async` is kept for shape-symmetry with `ingest_multipart` (the
/// dispatcher `.await`s either branch uniformly); the function body
/// itself is sync because the JSON body has already been buffered by
/// the caller before we get here.
#[allow(clippy::unused_async)]
async fn ingest_json(state: AppState, body: IngestJsonBody) -> Result<Json<Value>, Error> {
    let max_bytes = max_ingest_bytes();
    if body.text.len() as u64 > max_bytes {
        return Err(Error::bad_request(format!(
            "text body is {} bytes; exceeds the {max_bytes}-byte cap \
             (raise MNEM_HTTP_INGEST_MAX_BYTES if legitimate)",
            body.text.len()
        )));
    }
    let kind = match body.kind.as_deref() {
        Some("markdown" | "md") => SourceKind::Markdown,
        Some("pdf") => SourceKind::Pdf,
        Some("conversation" | "json" | "jsonl") => SourceKind::Conversation,
        Some("text" | "txt") | None => SourceKind::Text,
        Some(other) => {
            return Err(Error::bad_request(format!(
                "unknown `kind`: {other}; want one of markdown|text|pdf|conversation"
            )));
        }
    };
    let bytes = body.text.into_bytes();
    run_ingest(
        &state,
        &bytes,
        kind,
        IngestParams {
            ntype: body.ntype.unwrap_or_else(|| "Doc".into()),
            chunker: body.chunker.unwrap_or_else(|| "auto".into()),
            max_tokens: body.max_tokens.unwrap_or(512),
            overlap: body.overlap.unwrap_or(32),
            author: body.author,
            message: body.message.unwrap_or_else(|| "mnem http ingest".into()),
            extractor: body.extractor,
            ner_provider: body.ner_provider,
        },
    )
}

/// Shared post-parse parameters for both multipart and JSON variants.
struct IngestParams {
    ntype: String,
    chunker: String,
    max_tokens: u32,
    overlap: u32,
    author: String,
    message: String,
    /// C3 FIX-3: extractor selector, defaults to `"none"` (rule-based).
    extractor: Option<String>,
    /// NER provider override for this request. `None` defers to the
    /// server's `AppState::ner_cfg` (which itself falls back to Rule).
    ner_provider: Option<String>,
}

/// Shared execution path: clamp, build Ingester, run the pipeline,
/// commit, observe metrics, render the JSON response.
fn run_ingest(
    state: &AppState,
    bytes: &[u8],
    kind: SourceKind,
    mut params: IngestParams,
) -> Result<Json<Value>, Error> {
    if params.max_tokens > MAX_INGEST_TOKENS {
        return Err(Error::bad_request(format!(
            "max_tokens {} exceeds the {MAX_INGEST_TOKENS} cap",
            params.max_tokens
        )));
    }
    if params.author.trim().is_empty() {
        return Err(Error::bad_request("author is required"));
    }
    // Normalise whitespace-only message to the default.
    if params.message.trim().is_empty() {
        params.message = "mnem http ingest".into();
    }

    // Resolve NER config: per-request override → server default → Rule.
    let ner = match params.ner_provider.as_deref() {
        Some("none") => NerConfig::None,
        Some("rule") | None => state.ner_cfg.clone().unwrap_or(NerConfig::Rule),
        Some(other) => {
            return Err(Error::bad_request(format!(
                "unknown `ner_provider`: {other}; want one of rule|none"
            )));
        }
    };

    let chunker = resolve_chunker(&params.chunker, kind, params.max_tokens, params.overlap)?;
    let config = IngestConfig {
        chunker,
        ntype: params.ntype,
        max_tokens: params.max_tokens,
        overlap: params.overlap,
        ner,
    };
    let mut ing = Ingester::new(config);

    // C3 FIX-3: if the caller asked for the KeyBERT extractor, open
    // the server's configured embedder and wrap it in a
    // `KeyBertAdapter`. Zero cost when the flag is absent: the
    // default rule-based extractor stays wired.
    match params.extractor.as_deref() {
        None | Some("" | "none") => {}
        Some("keybert") => {
            let pc = state.embed_cfg.as_ref().ok_or_else(|| {
                Error::bad_request(
                    "extractor=keybert requires an [embed] provider configured on the server \
                     (MNEM_EMBED_PROVIDER / config.toml); none resolved",
                )
            })?;
            let boxed = mnem_embed_providers::open(pc).map_err(|e| {
                Error::bad_request(format!("opening embed provider for keybert: {e}"))
            })?;
            let arc: std::sync::Arc<dyn mnem_embed_providers::Embedder> =
                std::sync::Arc::from(boxed);
            ing = ing.with_extractor(Box::new(mnem_ingest::KeyBertAdapter::new(arc, "Keyword")));
        }
        Some(other) => {
            return Err(Error::bad_request(format!(
                "unknown `extractor`: {other}; want one of none|keybert"
            )));
        }
    }

    let started = Instant::now();
    let mut guard = state.repo.lock().map_err(|_| Error::locked())?;
    let mut tx = guard.start_transaction();
    let result = ing
        .ingest(&mut tx, bytes, kind)
        .map_err(|e| Error::bad_request(format!("ingest failed: {e}")))?;
    let commit_start = Instant::now();
    let new_repo = tx.commit(&params.author, &params.message)?;
    state
        .metrics
        .commit_duration
        .observe(commit_start.elapsed().as_secs_f64());

    let op_id = new_repo.op_id().to_string();
    let commit_cid = new_repo
        .view()
        .heads
        .first()
        .map_or_else(|| "<none>".to_string(), ToString::to_string);
    *guard = new_repo;

    // Observe ingest-specific metrics: duration + chunk counter.
    // Both are registered at AppState construction time so a scrape
    // against a never-ingested server still emits zero-valued series
    // (no "metric appears out of nowhere" surprise for Prometheus).
    let elapsed = started.elapsed().as_secs_f64();
    state.metrics.ingest_duration.observe(elapsed);
    state.metrics.ingest_chunks.inc_by(result.chunk_count);

    Ok(Json(json!({
        "schema":         "mnem.v1.ingest",
        "op_id":          op_id,
        "commit_cid":     commit_cid,
        "node_count":     result.node_count,
        "chunk_count":    result.chunk_count,
        "entity_count":   result.entity_count,
        "relation_count": result.relation_count,
        "elapsed_ms":     result.elapsed_ms,
    })))
}

fn resolve_chunker(
    choice: &str,
    kind: SourceKind,
    max_tokens: u32,
    overlap: u32,
) -> Result<ChunkerKind, Error> {
    Ok(match choice.to_ascii_lowercase().as_str() {
        "auto" => auto_chunker(
            kind,
            ChunkerAuto {
                max_tokens: Some(max_tokens),
                overlap: Some(overlap),
                max_messages: None,
            },
        ),
        "paragraph" => ChunkerKind::Paragraph,
        "recursive" => ChunkerKind::Recursive {
            max_tokens,
            overlap,
        },
        "session" => ChunkerKind::Session { max_messages: 10 },
        other => {
            return Err(Error::bad_request(format!(
                "chunker must be one of auto|paragraph|recursive|session; got `{other}`"
            )));
        }
    })
}

/// Drain a multipart text field to a UTF-8 String. The multipart crate
/// surfaces bytes; most of our fields are short text, so `to_text` is
/// the right shape.
async fn field_text(field: axum::extract::multipart::Field<'_>) -> Result<String, Error> {
    field
        .text()
        .await
        .map_err(|e| Error::bad_request(format!("decoding text field: {e}")))
}

// `FromRequest` is used via UFCS in the dispatcher; bring the trait
// into scope so the call compiles without an unused-import warning.
use axum::extract::FromRequest;