minigdb 0.1.0

An embedded property-graph database in Rust with a GQL query language, RocksDB-backed ACID storage, graph algorithms, and Python bindings
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
//! Axum-based HTTP server that serves the minigdb web GUI and a REST/JSON API.
//!
//! # Role
//! This module provides a browser-accessible interface to minigdb.  It is
//! compiled only when the `gui` Cargo feature is enabled.  The Axum server
//! runs in its own Tokio task alongside the TCP server started in
//! [`super::serve`].
//!
//! # Routes
//! | Method   | Path                  | Description                                         |
//! |----------|-----------------------|-----------------------------------------------------|
//! | `GET`    | `/`                   | Serves the embedded single-page HTML application.   |
//! | `GET`    | `/assets/*`           | Serves embedded static assets (logo, etc.).         |
//! | `GET`    | `/api/info`           | Returns `{"auth_required": bool}`.                  |
//! | `POST`   | `/api/auth`           | Exchanges `{user, password}` for a bearer token.    |
//! | `GET`    | `/api/graphs`         | Lists all open graph names.                         |
//! | `POST`   | `/api/graphs`         | Creates a new named graph.                          |
//! | `DELETE` | `/api/graphs/:name`   | Drops (deletes) a named graph.                      |
//! | `POST`   | `/api/query`          | Executes a GQL statement; returns rows.             |
//! | `POST`   | `/api/viz`            | Executes GQL and enriches results with node/edge data for graph visualisation. |
//!
//! # Authentication
//! When the server is configured with `auth_required = true`, all routes
//! except `GET /` and `POST /api/auth` require an `Authorization: Bearer
//! <token>` header.  Tokens are 128-bit opaque strings generated by
//! [`gen_token`] and stored in-memory in [`AppState::tokens`].  They are
//! cleared when the server restarts.
//!
//! # Visualisation (`/api/viz`)
//! The viz endpoint executes a GQL query, then scans every string value in the
//! result rows.  Any string that decodes as a valid ULID is looked up in the
//! graph as either a [`NodeId`] or an [`EdgeId`].  Resolved nodes and edges
//! (including the two endpoint nodes of each edge) are returned in parallel
//! `nodes` and `edges` arrays alongside the normal `rows` output, allowing
//! the browser to render a live graph visualisation without the client needing
//! to issue follow-up queries.
//!
//! # Key design decisions
//! - The HTML bundle is baked into the binary at compile time via
//!   `include_str!` so the server has no runtime file-system dependency.
//! - [`AppState`] is cheaply cloneable (`Arc` inside) and passed to every
//!   handler via Axum's `State` extractor.
//! - The `auth!` macro centralises the auth check so individual handlers stay
//!   readable.
//! - Transaction control keywords (`BEGIN`/`COMMIT`/`ROLLBACK`) in
//!   `/api/query` are intercepted before reaching the GQL parser because the
//!   HTTP API holds a single mutex lock for the entire request; per-request
//!   locking means transactions opened over HTTP are scoped to one request and
//!   cannot span multiple HTTP calls (unlike the TCP protocol).

use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;

use axum::{
    extract::{Path, State},
    http::{HeaderMap, HeaderValue, StatusCode},
    response::{Html, IntoResponse, Json, Response},
    routing::{delete, get, post},
    Router,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value as JsonValue};
use tokio::sync::Mutex;

use crate::types::{ulid_decode, ulid_encode, EdgeId, NodeId};
use super::{
    auth::{verify_password, ServerConfig},
    protocol::{row_to_json, value_to_json},
    registry::GraphRegistry,
};

// ── App state ─────────────────────────────────────────────────────────────────

/// Shared state injected into every Axum handler via [`State`].
///
/// Cheaply cloneable because all heavy fields are behind `Arc`.
#[derive(Clone)]
struct AppState {
    /// Shared registry of lazily-opened named graphs.
    registry: Arc<GraphRegistry>,
    /// Server configuration (auth settings + user list).
    config: Arc<ServerConfig>,
    /// Active session tokens (cleared on server restart).
    ///
    /// Stored in a `Mutex<HashSet>` so concurrent requests can check/insert
    /// tokens without a `RwLock` (token lookups are fast and infrequent).
    tokens: Arc<Mutex<HashSet<String>>>,
}

// ── Token generation ──────────────────────────────────────────────────────────

/// Generate a pseudo-random 128-bit opaque bearer token.
///
/// The token is produced by combining:
/// - The current wall-clock time in nanoseconds (lower 64 bits), providing
///   uniqueness across server restarts and widely-spaced calls.
/// - A monotonically-incrementing counter mixed with the timestamp, ensuring
///   uniqueness even for calls within the same nanosecond (e.g. in tests).
///
/// The result is a 32-character lowercase hexadecimal string.  While this is
/// not cryptographically secure (no CSPRNG), it is sufficient for a
/// development/single-host deployment where token guessing is not a realistic
/// attack vector.
fn gen_token() -> String {
    // AtomicU64 counter survives across calls in the same process lifetime.
    static CTR: AtomicU64 = AtomicU64::new(0);
    let t = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos() as u64;
    // Mix the counter with the timestamp to avoid collisions within the same ns.
    let c = CTR.fetch_add(1, Ordering::Relaxed);
    format!("{:016x}{:016x}", t, c.wrapping_add(t >> 3))
}

// ── Auth helper ───────────────────────────────────────────────────────────────

/// Return `true` if the request is authorised to proceed.
///
/// When `state.config.server.auth_required` is `false`, every request is
/// allowed regardless of headers.  Otherwise the function requires an
/// `Authorization: Bearer <token>` header where `<token>` is present in
/// [`AppState::tokens`].
async fn is_auth(headers: &HeaderMap, state: &AppState) -> bool {
    if !state.config.server.auth_required {
        return true;
    }
    // Extract the raw Authorization header value.
    let Some(v) = headers.get("authorization").and_then(|h| h.to_str().ok()) else {
        return false;
    };
    // Must be a Bearer token — strip the prefix.
    let Some(tok) = v.strip_prefix("Bearer ") else { return false };
    // Check against the set of issued tokens.
    state.tokens.lock().await.contains(tok)
}

/// Convenience macro: return `401 UNAUTHORIZED` early if the request lacks a
/// valid session token.
///
/// Used at the top of every handler that requires authentication.
macro_rules! auth {
    ($h:expr, $s:expr) => {
        if !is_auth($h, $s).await {
            return StatusCode::UNAUTHORIZED.into_response();
        }
    };
}

// ── Public entry point ────────────────────────────────────────────────────────

/// Bind `addr` and serve the GUI + REST API indefinitely.
///
/// Constructs an [`AppState`], registers all routes, and calls
/// `axum::serve`.  This function is intended to be spawned as a background
/// Tokio task by the parent [`super::serve`] function.
///
/// # Arguments
/// - `addr` — the TCP address for the HTTP server (e.g. `0.0.0.0:7475`).
/// - `registry` — shared graph registry (also used by the TCP server).
/// - `config` — server configuration; shared with the TCP server so auth
///   settings remain consistent.
///
/// # Errors
/// Returns `Err` if the TCP listener cannot be bound or if Axum fails to
/// start.
pub async fn serve(
    addr: SocketAddr,
    registry: Arc<GraphRegistry>,
    config: Arc<ServerConfig>,
) -> std::io::Result<()> {
    let state = AppState {
        registry,
        config,
        tokens: Arc::new(Mutex::new(HashSet::new())),
    };
    let app = Router::new()
        .route("/", get(serve_html))
        .route("/assets/minigdb_logo.webp", get(serve_logo))
        .route("/api/info", get(api_info))
        .route("/api/auth", post(api_auth))
        .route("/api/graphs", get(api_graphs).post(api_create_graph))
        .route("/api/graphs/:name", delete(api_drop_graph))
        .route("/api/query", post(api_query))
        .route("/api/viz", post(api_viz))
        .route("/api/upload/nodes", post(api_upload_nodes))
        .route("/api/upload/edges", post(api_upload_edges))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind(addr).await?;
    eprintln!("minigdb GUI  → http://{addr}");
    axum::serve(listener, app)
        .await
        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
}

// ── Static HTML ───────────────────────────────────────────────────────────────

/// The single-page HTML application, baked into the binary at compile time.
///
/// Embedding avoids any runtime file-system dependency: the binary is
/// self-contained and can be deployed as a single executable.
static HTML: &str = include_str!("static/gui.html");
static LOGO: &[u8] = include_bytes!("static/assets/minigdb_logo.webp");

/// Serve the embedded GUI HTML for `GET /`.
async fn serve_html() -> Html<&'static str> {
    Html(HTML)
}

/// Serve the embedded logo for `GET /assets/minigdb_logo.webp`.
async fn serve_logo() -> Response {
    let mut res = Response::new(axum::body::Body::from(LOGO));
    res.headers_mut().insert(
        axum::http::header::CONTENT_TYPE,
        HeaderValue::from_static("image/webp"),
    );
    res
}

// ── /api/info ─────────────────────────────────────────────────────────────────

/// `GET /api/info` — return server capabilities.
///
/// Currently returns only `{"auth_required": bool}`.  The browser uses this
/// to decide whether to show a login form before making any other API calls.
async fn api_info(State(s): State<AppState>) -> impl IntoResponse {
    Json(json!({ "auth_required": s.config.server.auth_required }))
}

// ── /api/auth ─────────────────────────────────────────────────────────────────

/// Request body for `POST /api/auth`.
#[derive(Deserialize)]
struct AuthBody {
    user: String,
    password: String,
}

/// `POST /api/auth` — exchange credentials for a session token.
///
/// If `auth_required` is `false`, returns an empty token (`""`) so the
/// browser can proceed without prompting for credentials.
///
/// On success returns `{"token": "<hex>"}`.
/// On failure returns `401` with `{"error": "invalid credentials"}`.
async fn api_auth(State(s): State<AppState>, Json(body): Json<AuthBody>) -> impl IntoResponse {
    if !s.config.server.auth_required {
        // No auth — return an empty token so the client can proceed.
        return Json(json!({ "token": "" })).into_response();
    }
    match s.config.find_user(&body.user) {
        Some(u) if verify_password(&body.password, &u.password_hash) => {
            // Credentials valid: generate a token, store it, return it.
            let tok = gen_token();
            s.tokens.lock().await.insert(tok.clone());
            Json(json!({ "token": tok })).into_response()
        }
        _ => (StatusCode::UNAUTHORIZED, Json(json!({ "error": "invalid credentials" }))).into_response(),
    }
}

// ── /api/graphs ───────────────────────────────────────────────────────────────

/// `GET /api/graphs` — list all user-visible graph names known to the registry.
///
/// System graphs (names starting with `_`) are excluded.
/// Returns `{"graphs": ["default", "analytics", ...]}`.
async fn api_graphs(headers: HeaderMap, State(s): State<AppState>) -> impl IntoResponse {
    auth!(&headers, &s);
    // registry.list() already filters system graphs; this is defence-in-depth.
    let graphs = s.registry.list().await;
    Json(json!({ "graphs": graphs })).into_response()
}

/// Request body for `POST /api/graphs`.
#[derive(Deserialize)]
struct CreateBody {
    /// Name of the graph to create.  Must be alphanumeric + `_`/`-`, ≤ 64 chars.
    name: String,
}

/// `POST /api/graphs` — create a new named graph.
///
/// Returns `{}` on success or `400` with `{"error": "..."}` on failure
/// (e.g. the name is invalid or the graph already exists).
async fn api_create_graph(
    headers: HeaderMap,
    State(s): State<AppState>,
    Json(body): Json<CreateBody>,
) -> impl IntoResponse {
    auth!(&headers, &s);
    match s.registry.create(&body.name).await {
        Ok(()) => Json(json!({})).into_response(),
        Err(e) => (StatusCode::BAD_REQUEST, Json(json!({ "error": e.to_string() }))).into_response(),
    }
}

/// `DELETE /api/graphs/:name` — drop (permanently delete) a named graph.
///
/// Returns `{}` on success or `400` with `{"error": "..."}` on failure
/// (e.g. the graph does not exist or the name is a system graph).
async fn api_drop_graph(
    headers: HeaderMap,
    State(s): State<AppState>,
    Path(name): Path<String>,
) -> impl IntoResponse {
    auth!(&headers, &s);
    if name.starts_with('_') {
        return (StatusCode::BAD_REQUEST, Json(json!({ "error": format!("cannot drop system graph '{name}'") }))).into_response();
    }
    match s.registry.drop_graph(&name).await {
        Ok(()) => Json(json!({})).into_response(),
        Err(e) => (StatusCode::BAD_REQUEST, Json(json!({ "error": e.to_string() }))).into_response(),
    }
}

// ── /api/query ────────────────────────────────────────────────────────────────

/// Shared request body for `POST /api/query` and `POST /api/viz`.
#[derive(Deserialize)]
struct QueryBody {
    /// Optional graph name; defaults to `"default"` if omitted.
    graph: Option<String>,
    /// The GQL statement to execute.
    query: String,
}

/// `POST /api/query` — execute a GQL statement and return the result rows.
///
/// Acquires a lock on the target graph for the duration of the request.
/// Transaction control keywords (`BEGIN`, `COMMIT`, `ROLLBACK`) are handled
/// specially:
/// - They are intercepted before the query reaches the GQL parser.
/// - Over HTTP each request holds its own lock, so a `BEGIN`/`COMMIT` pair
///   must occur within the same HTTP request body (or the caller must accept
///   that the transaction is not isolated from other connections).
///
/// Returns `{"rows": [...], "elapsed_ms": <f64>}` on success, or
/// `{"error": "..."}` on failure.
async fn api_query(
    headers: HeaderMap,
    State(s): State<AppState>,
    Json(body): Json<QueryBody>,
) -> impl IntoResponse {
    auth!(&headers, &s);
    let graph_name = body.graph.as_deref().unwrap_or("default");
    let arc = match s.registry.get_or_open(graph_name).await {
        Ok(a) => a,
        Err(e) => return Json(json!({ "error": e.to_string() })).into_response(),
    };
    let start = Instant::now();
    let mut guard = arc.lock().await;
    let (graph, txn_id) = &mut *guard;

    // Handle transaction control keywords before passing to the GQL parser,
    // which does not recognise them as valid statements.
    let bare = body.query.trim().trim_end_matches(';').trim().to_ascii_uppercase();
    let result: Result<Vec<HashMap<String, JsonValue>>, _> = match bare.as_str() {
        "BEGIN"    => graph.begin_transaction().map(|_| vec![]),
        "COMMIT"   => graph.commit_transaction().map(|_| vec![]),
        "ROLLBACK" => graph.rollback_transaction().map(|_| vec![]),
        _ => crate::query_capturing(&body.query, graph, txn_id)
                .map(|(rows, _)| rows.iter().map(row_to_json).collect()),
    };
    match result {
        Ok(json_rows) => Json(json!({
            "rows": json_rows,
            "elapsed_ms": start.elapsed().as_secs_f64() * 1000.0
        }))
        .into_response(),
        Err(e) => Json(json!({ "error": e.to_string() })).into_response(),
    }
}

// ── /api/viz ──────────────────────────────────────────────────────────────────

/// A graph node as returned by the `/api/viz` endpoint.
#[derive(Serialize)]
struct VizNode {
    /// ULID string representation of the node's [`NodeId`].
    id: String,
    /// All labels attached to this node.
    labels: Vec<String>,
    /// All key-value properties on this node, serialised to JSON values.
    properties: HashMap<String, JsonValue>,
}

/// A graph edge as returned by the `/api/viz` endpoint.
#[derive(Serialize)]
struct VizEdge {
    /// ULID string representation of the edge's [`EdgeId`].
    id: String,
    /// ULID string of the source (from) node.
    source: String,
    /// ULID string of the target (to) node.
    target: String,
    /// The relationship type label (e.g. `"KNOWS"`).
    label: String,
    /// All key-value properties on this edge, serialised to JSON values.
    properties: HashMap<String, JsonValue>,
}

/// `POST /api/viz` — execute GQL and enrich the results with node/edge data.
///
/// Executes the given query identically to `/api/query`, then post-processes
/// the result rows to extract graph topology:
///
/// 1. Every string value in every row is tested with [`ulid_decode`].
/// 2. Decodable strings are looked up in the graph as a [`NodeId`] or
///    [`EdgeId`] via [`enrich`].
/// 3. Resolved nodes and edges (plus the endpoint nodes of each edge) are
///    collected into deduplicated maps.
///
/// The response includes the normal `rows` array plus two additional arrays:
/// ```json
/// {
///   "rows": [...],
///   "nodes": [{"id":"...", "labels":[...], "properties":{...}}, ...],
///   "edges": [{"id":"...", "source":"...", "target":"...", "label":"...", "properties":{...}}, ...],
///   "elapsed_ms": 1.23
/// }
/// ```
/// The browser uses the `nodes` and `edges` arrays to render a Cytoscape.js
/// graph without issuing any follow-up requests.
async fn api_viz(
    headers: HeaderMap,
    State(s): State<AppState>,
    Json(body): Json<QueryBody>,
) -> impl IntoResponse {
    auth!(&headers, &s);
    let graph_name = body.graph.as_deref().unwrap_or("default");
    let arc = match s.registry.get_or_open(graph_name).await {
        Ok(a) => a,
        Err(e) => return Json(json!({ "error": e.to_string() })).into_response(),
    };
    let start = Instant::now();
    let mut guard = arc.lock().await;
    let (graph, txn_id) = &mut *guard;
    let rows = match crate::query_capturing(&body.query, graph, txn_id) {
        Ok((rows, _)) => rows,
        Err(e) => return Json(json!({ "error": e.to_string() })).into_response(),
    };

    // Scan all string values in the result rows for ULID-encoded IDs.
    // Using HashMap keyed by ULID string ensures each node/edge appears once
    // even if the same ID appears in multiple rows or columns.
    let mut nodes: HashMap<String, VizNode> = HashMap::new();
    let mut edges: HashMap<String, VizEdge> = HashMap::new();

    for row in &rows {
        for val in row.values() {
            // Only string values can encode a ULID; skip all other types.
            if let crate::Value::String(s) = val {
                enrich(s, graph, &mut nodes, &mut edges);
            }
        }
    }

    let json_rows: Vec<_> = rows.iter().map(row_to_json).collect();
    Json(json!({
        "rows": json_rows,
        "nodes": nodes.into_values().collect::<Vec<_>>(),
        "edges": edges.into_values().collect::<Vec<_>>(),
        "elapsed_ms": start.elapsed().as_secs_f64() * 1000.0,
    }))
    .into_response()
}

/// Attempt to resolve `s` as a node or edge ULID and insert it into the
/// appropriate accumulator map.
///
/// # Algorithm
/// 1. Short-circuit if `s` is already present in either map (deduplication).
/// 2. Attempt to decode `s` as a ULID via [`ulid_decode`]; silently return on
///    failure — most strings in query results are not ULIDs.
/// 3. Try `graph.get_node(NodeId(raw))` first, then `graph.get_edge(EdgeId(raw))`.
/// 4. For an edge, recursively call `enrich` on the source and target node
///    ULIDs so that both endpoint nodes always appear in the `nodes` map,
///    even if the query did not explicitly select them.
///
/// # Parameters
/// - `s` — the string value to test.
/// - `graph` — read-only view of the graph for `get_node`/`get_edge` lookups.
/// - `nodes` — accumulator for resolved nodes, keyed by ULID string.
/// - `edges` — accumulator for resolved edges, keyed by ULID string.
fn enrich(
    s: &str,
    graph: &crate::Graph,
    nodes: &mut HashMap<String, VizNode>,
    edges: &mut HashMap<String, VizEdge>,
) {
    // Skip if already resolved (avoids redundant lookups and infinite recursion
    // when an edge's endpoint node ULID also appears in the result rows).
    if nodes.contains_key(s) || edges.contains_key(s) {
        return;
    }
    // Attempt ULID decode; bail silently if `s` is not a valid ULID.
    let Ok(raw) = ulid_decode(s) else { return };

    if let Some(node) = graph.get_node(NodeId(raw)) {
        // Resolved as a node: build VizNode and insert.
        let properties = node
            .properties
            .iter()
            .map(|(k, v)| (k.clone(), value_to_json(v)))
            .collect();
        nodes.insert(
            s.to_string(),
            VizNode { id: s.to_string(), labels: node.labels.clone(), properties },
        );
    } else if let Some(edge) = graph.get_edge(EdgeId(raw)) {
        // Resolved as an edge: encode endpoint node IDs as ULIDs and build VizEdge.
        let properties = edge
            .properties
            .iter()
            .map(|(k, v)| (k.clone(), value_to_json(v)))
            .collect();
        let src = ulid_encode(edge.from_node.0);
        let tgt = ulid_encode(edge.to_node.0);
        edges.insert(
            s.to_string(),
            VizEdge {
                id: s.to_string(),
                source: src.clone(),
                target: tgt.clone(),
                label: edge.label.clone(),
                properties,
            },
        );
        // Recursively enrich the endpoint nodes so the visualiser always has
        // complete node data even when the query only returned edge IDs.
        enrich(&src, graph, nodes, edges);
        enrich(&tgt, graph, nodes, edges);
    }
    // If the ULID matched neither a node nor an edge, it is from an unrelated
    // domain (e.g. a ULID used as a user-defined property value) — ignore it.
}

// ── /api/upload/nodes ─────────────────────────────────────────────────────────

/// Request body for `POST /api/upload/nodes`.
#[derive(Deserialize)]
struct UploadNodesBody {
    /// Optional target graph name; defaults to `"default"`.
    graph: Option<String>,
    /// CSV content as a UTF-8 string.
    csv: String,
    /// Default node label when the CSV has no `:LABEL` column.
    label: Option<String>,
}

/// `POST /api/upload/nodes` — bulk-insert nodes from a CSV string.
///
/// Accepts a JSON body with `{"csv": "...", "graph": "...", "label": "..."}`.
/// The CSV must use the `:ID` / `:LABEL` column conventions described in
/// [`crate::csv_import`].
///
/// Returns `{"inserted": N, "id_map": {"user_id": "ulid", ...}}` on success.
/// The `id_map` should be passed back when uploading the corresponding edge CSV
/// via `/api/upload/edges`.
async fn api_upload_nodes(
    headers: HeaderMap,
    State(s): State<AppState>,
    Json(body): Json<UploadNodesBody>,
) -> impl IntoResponse {
    auth!(&headers, &s);
    let graph_name = body.graph.as_deref().unwrap_or("default");
    let arc = match s.registry.get_or_open(graph_name).await {
        Ok(a) => a,
        Err(e) => return Json(json!({ "error": e.to_string() })).into_response(),
    };
    let mut guard = arc.lock().await;
    let (graph, _txn_id) = &mut *guard;

    match crate::csv_import::load_nodes_csv(body.csv.as_bytes(), graph, body.label.as_deref()) {
        Ok(result) => {
            let id_map_json = crate::csv_import::id_map_to_strings(&result.id_map);
            Json(json!({
                "inserted": result.inserted,
                "id_map": id_map_json,
            })).into_response()
        }
        Err(e) => Json(json!({ "error": e.to_string() })).into_response(),
    }
}

// ── /api/upload/edges ─────────────────────────────────────────────────────────

/// Request body for `POST /api/upload/edges`.
#[derive(Deserialize)]
struct UploadEdgesBody {
    /// Optional target graph name; defaults to `"default"`.
    graph: Option<String>,
    /// CSV content as a UTF-8 string.
    csv: String,
    /// Default edge label when the CSV has no `:TYPE` column.
    label: Option<String>,
    /// Maps user-supplied `:ID` strings to ULID strings returned by a prior
    /// `/api/upload/nodes` call.  If absent, no edges can be resolved.
    id_map: Option<HashMap<String, String>>,
}

/// `POST /api/upload/edges` — bulk-insert edges from a CSV string.
///
/// Accepts a JSON body with `{"csv": "...", "id_map": {...}, "graph": "...", "label": "..."}`.
/// `id_map` should be the `id_map` object returned by a preceding
/// `/api/upload/nodes` request; edges whose `:START_ID` / `:END_ID` values are
/// not in the map are silently skipped and counted in `skipped`.
///
/// Returns `{"inserted": N, "skipped": M}` on success.
async fn api_upload_edges(
    headers: HeaderMap,
    State(s): State<AppState>,
    Json(body): Json<UploadEdgesBody>,
) -> impl IntoResponse {
    auth!(&headers, &s);
    let graph_name = body.graph.as_deref().unwrap_or("default");
    let arc = match s.registry.get_or_open(graph_name).await {
        Ok(a) => a,
        Err(e) => return Json(json!({ "error": e.to_string() })).into_response(),
    };
    let mut guard = arc.lock().await;
    let (graph, _txn_id) = &mut *guard;

    // Decode id_map from ULID strings to NodeIds.
    let id_map = body.id_map
        .as_ref()
        .map(|m| crate::csv_import::id_map_from_strings(m))
        .unwrap_or_default();

    match crate::csv_import::load_edges_csv(body.csv.as_bytes(), graph, &id_map, body.label.as_deref()) {
        Ok(result) => Json(json!({
            "inserted": result.inserted,
            "skipped":  result.skipped,
        })).into_response(),
        Err(e) => Json(json!({ "error": e.to_string() })).into_response(),
    }
}