Skip to main content

nedb_engine/
server.rs

1//! nedbd v2 HTTP server — same /v1/databases/* API surface as v1.
2//! Drop-in replacement: Vision, itsl_mirror, all existing clients work unchanged.
3//!
4//! Built on tokio + axum. Each database is opened once and held in an Arc<RwLock>.
5//! All write paths use the Db's internal atomic operations; the RwLock is only
6//! needed to protect the manager's HashMap (open/close operations), not individual
7//! document writes (which are lock-free at the content-addressed level).
8
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::atomic::AtomicU64;
13
14use axum::{
15    extract::{Path as AxPath, State, Query as AxQuery},
16    http::{HeaderMap, StatusCode},
17    response::{IntoResponse, Response, sse::{Event, KeepAlive, Sse}},
18    routing::{delete, get, post},
19    Json, Router,
20};
21use dashmap::DashMap;
22use serde::Deserialize;
23use serde_json::{json, Value};
24use tokio::sync::{broadcast, RwLock};
25use tokio_stream::wrappers::BroadcastStream;
26use tokio_stream::StreamExt as _;
27
28use crate::db::Db;
29use crate::nql;
30use crate::store::Node;
31
32// ── Log channel — broadcast to all /events SSE subscribers ────────────────────
33
34const LOG_CHANNEL_CAP: usize = 512;
35const SUB_CHANNEL_CAP: usize = 256;
36
37// ── Subscription registry ─────────────────────────────────────────────────────
38// Maps (db_name, sub_id) → (nql_query, result_hash, event_sender)
39// After every write, all registered queries for that db are re-evaluated.
40// Diffs (added/removed/changed rows) are emitted as SSE events.
41
42type SubKey = (String, u64);  // (db_name, sub_id)
43type SubVal = (String, String, broadcast::Sender<String>);  // (nql, last_hash, tx)
44
45/// Send a timestamped log line to both stdout and all /events subscribers.
46macro_rules! nlog {
47    ($tx:expr, $($arg:tt)*) => {{
48        let line = format!($($arg)*);
49        println!("{}", line);
50        let _ = $tx.send(line);
51    }};
52}
53
54// ── Manager ───────────────────────────────────────────────────────────────────
55
56#[derive(Clone)]
57pub struct Manager {
58    inner:     Arc<RwLock<ManagerInner>>,
59    pub token: Option<String>,
60    /// Broadcast channel — every log line goes here; /events streams them.
61    pub log_tx: broadcast::Sender<String>,
62    /// Live query subscriptions: (db_name, sub_id) → (nql, last_hash, event_tx)
63    subs:    Arc<DashMap<SubKey, SubVal>>,
64    sub_ctr: Arc<AtomicU64>,
65}
66
67struct ManagerInner {
68    data_dir:    PathBuf,
69    dbs:         HashMap<String, Arc<Db>>,
70    tmk:         Option<[u8; 32]>,
71    memory_mode: bool,
72}
73
74impl Manager {
75    pub fn new(data_dir: &Path, tmk: Option<[u8; 32]>, token: Option<String>, memory_mode: bool) -> Self {
76        let (log_tx, _) = broadcast::channel(LOG_CHANNEL_CAP);
77        Self {
78            inner: Arc::new(RwLock::new(ManagerInner {
79                data_dir: data_dir.to_path_buf(),
80                dbs:      HashMap::new(),
81                tmk,
82                memory_mode,
83            })),
84            token,
85            log_tx,
86            subs:    Arc::new(DashMap::new()),
87            sub_ctr: Arc::new(AtomicU64::new(1)),
88        }
89    }
90
91    /// Register a live query subscription. Returns (sub_id, receiver).
92    fn subscribe(&self, db: &str, nql: String) -> (u64, broadcast::Receiver<String>) {
93        use std::sync::atomic::Ordering;
94        let id = self.sub_ctr.fetch_add(1, Ordering::Relaxed);
95        let (tx, rx) = broadcast::channel(SUB_CHANNEL_CAP);
96        self.subs.insert((db.to_string(), id), (nql, String::new(), tx));
97        (id, rx)
98    }
99
100    /// Unregister a subscription.
101    fn unsubscribe(&self, db: &str, sub_id: u64) {
102        self.subs.remove(&(db.to_string(), sub_id));
103    }
104
105    /// After a write: re-evaluate all subscriptions for `db`, emit diffs.
106    fn notify_subscribers(&self, db: &str, db_arc: &Arc<crate::db::Db>) {
107        let keys: Vec<SubKey> = self.subs.iter()
108            .filter(|e| e.key().0 == db)
109            .map(|e| e.key().clone())
110            .collect();
111
112        for key in keys {
113            if let Some(mut entry) = self.subs.get_mut(&key) {
114                let (nql, last_hash, tx) = entry.value_mut();
115                // Re-run the query
116                let rows = match crate::nql::query(db_arc, nql) {
117                    Ok((rows, _)) => rows,
118                    Err(_) => continue,
119                };
120                // Hash the result set
121                let new_hash = format!("{:?}", rows.iter().map(|r| r.to_string()).collect::<Vec<_>>());
122                if new_hash == *last_hash { continue; }
123                *last_hash = new_hash;
124                // Send the full current result as a diff event
125                let event = json!({
126                    "sub_id": key.1,
127                    "db":     &key.0,
128                    "nql":    nql.as_str(),
129                    "rows":   rows,
130                    "count":  rows.len(),
131                });
132                let _ = tx.send(event.to_string());
133            }
134        }
135    }
136
137    /// Open all existing databases in the data directory on startup.
138    pub async fn open_all(&self) -> anyhow::Result<()> {
139        let (data_dir, tmk, memory_mode) = {
140            let inner = self.inner.read().await;
141            (inner.data_dir.clone(), inner.tmk, inner.memory_mode)
142        };
143        // In memory mode: nothing to open from disk — all DBs created on first write
144        if memory_mode { return Ok(()); }
145        if !data_dir.exists() {
146            std::fs::create_dir_all(&data_dir)?;
147            return Ok(());
148        }
149        let mut names = vec![];
150        for entry in std::fs::read_dir(&data_dir)? {
151            let entry = entry?;
152            if entry.file_type()?.is_dir() {
153                names.push(entry.file_name().to_string_lossy().to_string());
154            }
155        }
156        let log_tx = self.log_tx.clone();
157        let mut inner = self.inner.write().await;
158        for name in names {
159            let db_path = inner.data_dir.join(&name);
160            let dek = tmk.map(|k| crate::store::Dek::from_tmk(&k, name.as_bytes()));
161            match Db::open(&db_path, dek) {
162                Ok(db) => {
163                    nlog!(log_tx, "  [nedbd] opened database {:?}", name);
164                    let db_arc = Arc::new(db);
165                    Db::start_cold_scan(Arc::clone(&db_arc));
166                    // Flush MANIFEST every 1s in background — removes I/O from write path
167                    Db::start_manifest_ticker(Arc::clone(&db_arc), 1000);
168                    inner.dbs.insert(name, db_arc);
169                }
170                Err(e) => nlog!(log_tx, "  [nedbd] ERROR opening {:?}: {}", name, e),
171            }
172        }
173        Ok(())
174    }
175
176    async fn get_db(&self, name: &str) -> Option<Arc<Db>> {
177        self.inner.read().await.dbs.get(name).cloned()
178    }
179
180    async fn create_db(&self, name: &str) -> anyhow::Result<Arc<Db>> {
181        let (data_dir, tmk, memory_mode) = {
182            let inner = self.inner.read().await;
183            (inner.data_dir.clone(), inner.tmk, inner.memory_mode)
184        };
185        let db = if memory_mode {
186            // Pure in-memory — instant, no files
187            Arc::new(Db::in_memory())
188        } else {
189            let db_path = data_dir.join(name);
190            let dek = tmk.map(|k| crate::store::Dek::from_tmk(&k, name.as_bytes()));
191            let db = Arc::new(Db::open(&db_path, dek)?);
192            Db::start_cold_scan(Arc::clone(&db));
193            Db::start_manifest_ticker(Arc::clone(&db), 1000);
194            db
195        };
196        self.inner.write().await.dbs.insert(name.to_string(), db.clone());
197        Ok(db)
198    }
199
200    async fn drop_db(&self, name: &str) -> bool {
201        let db = self.inner.write().await.dbs.remove(name);
202        if let Some(db) = db {
203            // Flush manifest before dropping
204            db.flush_manifest_if_dirty();
205            let data_dir = self.inner.read().await.data_dir.clone();
206            let _ = std::fs::remove_dir_all(data_dir.join(name));
207            true
208        } else {
209            false
210        }
211    }
212
213    /// Flush all open databases (id-index WAL + MANIFEST) — call on graceful shutdown.
214    pub async fn flush_all(&self) {
215        let inner = self.inner.read().await;
216        for db in inner.dbs.values() {
217            db.flush_all();  // WAL + manifest
218        }
219    }
220
221    async fn names(&self) -> Vec<String> {
222        self.inner.read().await.dbs.keys().cloned().collect()
223    }
224
225    /// Emit a log line to stdout and all /events SSE subscribers.
226    pub fn log(&self, msg: impl Into<String>) {
227        let line = msg.into();
228        println!("{}", line);
229        let _ = self.log_tx.send(line);
230    }
231
232    fn check_auth(&self, headers: &HeaderMap) -> bool {
233        match &self.token {
234            None => true,
235            Some(required) => {
236                if let Some(auth) = headers.get("authorization") {
237                    if let Ok(s) = auth.to_str() {
238                        return s == format!("Bearer {}", required);
239                    }
240                }
241                false
242            }
243        }
244    }
245}
246
247// ── Error helpers ─────────────────────────────────────────────────────────────
248
249fn err(status: StatusCode, msg: &str) -> Response {
250    (status, Json(json!({"error": msg}))).into_response()
251}
252
253fn ok(body: Value) -> Response {
254    (StatusCode::OK, Json(body)).into_response()
255}
256
257/// Return (seq, head) — both O(1) reads from in-memory atomics/cache.
258/// The head is maintained incrementally by Db::put() and Db::delete()
259/// so we never recompute it from scratch on every response.
260fn db_seq_head(db: &Db) -> (u64, String) {
261    let seq  = db.seq.load(std::sync::atomic::Ordering::SeqCst);
262    let head = db.head();
263    (seq, head)
264}
265
266// ── Route handlers ────────────────────────────────────────────────────────────
267
268async fn health(State(mgr): State<Manager>) -> Response {
269    let names = mgr.names().await;
270    let inner = mgr.inner.read().await;
271    ok(json!({
272        "ok":        true,
273        "service":   "nedbd",
274        "version":   env!("CARGO_PKG_VERSION"),
275        "engine":    "dag",
276        "memory":    inner.memory_mode,
277        "databases": names,
278        "encrypted": inner.tmk.is_some(),
279    }))
280}
281
282async fn list_databases(State(mgr): State<Manager>, headers: HeaderMap) -> Response {
283    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
284    let names = mgr.names().await;
285    let summaries: Vec<Value> = {
286        let inner = mgr.inner.read().await;
287        names.iter().map(|n| {
288            if let Some(db) = inner.dbs.get(n) {
289                let (seq, head) = db_seq_head(db);
290                json!({"name": n, "seq": seq, "head": head, "collections": db.id_index.collections()})
291            } else {
292                json!({"name": n})
293            }
294        }).collect()
295    };
296    ok(json!({"databases": summaries}))
297}
298
299#[derive(Deserialize)]
300struct CreateDbBody { name: String }
301
302async fn create_database(
303    State(mgr): State<Manager>,
304    headers: HeaderMap,
305    Json(body): Json<CreateDbBody>,
306) -> Response {
307    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
308    if body.name.is_empty() { return err(StatusCode::BAD_REQUEST, "name is required"); }
309    match mgr.create_db(&body.name).await {
310        Ok(db) => {
311            let (seq, head) = db_seq_head(&db);
312            (StatusCode::CREATED, Json(json!({"database": {"name": body.name, "seq": seq, "head": head}}))).into_response()
313        }
314        Err(e) => err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
315    }
316}
317
318async fn get_database(
319    State(mgr): State<Manager>,
320    headers: HeaderMap,
321    AxPath(name): AxPath<String>,
322) -> Response {
323    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
324    match mgr.get_db(&name).await {
325        None => err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
326        Some(db) => {
327            let (seq, head) = db_seq_head(&db);
328            ok(json!({"name": name, "seq": seq, "head": head, "collections": db.id_index.collections()}))
329        }
330    }
331}
332
333async fn drop_database(
334    State(mgr): State<Manager>,
335    headers: HeaderMap,
336    AxPath(name): AxPath<String>,
337) -> Response {
338    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
339    let dropped = mgr.drop_db(&name).await;
340    ok(json!({"dropped": dropped}))
341}
342
343#[derive(Deserialize)]
344struct QueryBody { nql: String }
345
346async fn query_database(
347    State(mgr): State<Manager>,
348    headers: HeaderMap,
349    AxPath(name): AxPath<String>,
350    Json(body): Json<QueryBody>,
351) -> Response {
352    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
353    let db = match mgr.get_db(&name).await {
354        None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
355        Some(db) => db,
356    };
357    if body.nql.trim().is_empty() {
358        return err(StatusCode::BAD_REQUEST, "nql is required");
359    }
360    match nql::query(&db, &body.nql) {
361        Ok((rows, count)) => {
362            let (seq, head) = db_seq_head(&db);
363            ok(json!({"rows": rows, "count": count, "seq": seq, "head": head}))
364        }
365        Err(e) => err(StatusCode::BAD_REQUEST, &format!("NQL error: {}", e)),
366    }
367}
368
369#[derive(Deserialize)]
370struct PutBody {
371    coll:       String,
372    id:         String,
373    doc:        Value,
374    caused_by:  Option<Vec<serde_json::Value>>,
375    valid_from: Option<String>,
376    valid_to:   Option<String>,
377    #[allow(dead_code)] evidence:   Option<String>,
378    #[allow(dead_code)] confidence: Option<f64>,
379    #[allow(dead_code)] client:     Option<String>,
380    #[allow(dead_code)] nonce:      Option<u64>,
381    #[allow(dead_code)] idem:       Option<String>,
382}
383
384#[derive(Deserialize)]
385struct LinkBody {
386    frm: String,
387    rel: String,
388    to:  String,
389}
390
391async fn put_document(
392    State(mgr): State<Manager>,
393    headers: HeaderMap,
394    AxPath(name): AxPath<String>,
395    Json(body): Json<PutBody>,
396) -> Response {
397    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
398    let db = match mgr.get_db(&name).await {
399        None => {
400            // Auto-create database on first write
401            match mgr.create_db(&name).await {
402                Ok(db) => db,
403                Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
404            }
405        }
406        Some(db) => db,
407    };
408    // Block writes until background startup scan completes (cold start only).
409    // Reads and queries always proceed immediately.
410    if !db.startup_ready.load(std::sync::atomic::Ordering::SeqCst) {
411        return err(StatusCode::SERVICE_UNAVAILABLE,
412            "database startup in progress — reads available, writes retry in a moment");
413    }
414    // Resolve caused_by items: accept hash strings (v2 native) OR seq integers (v1 compat).
415    let caused_by: Vec<String> = body.caused_by.unwrap_or_default()
416        .into_iter()
417        .filter_map(|v| match v {
418            serde_json::Value::String(s) => Some(s),
419            serde_json::Value::Number(n) => {
420                n.as_u64().and_then(|seq| db.get_hash_by_seq(seq))
421            }
422            _ => None,
423        })
424        .collect();
425    // Run synchronous file I/O (objects.write) on a blocking thread so concurrent
426    // PUTs don't serialize on the tokio async thread pool.
427    let coll = body.coll.clone();
428    let id   = body.id.clone();
429    let doc  = body.doc.clone();
430    let vf   = body.valid_from.clone();
431    let vt   = body.valid_to.clone();
432    let db2  = Arc::clone(&db);
433    let result = tokio::task::spawn_blocking(move || {
434        db2.put(&coll, &id, doc, caused_by, vf, vt)
435    }).await;
436    match result {
437        Err(join_err) => err(StatusCode::INTERNAL_SERVER_ERROR, &join_err.to_string()),
438        Ok(Err(e))    => err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
439        Ok(Ok(node))  => {
440            let (seq, head) = db_seq_head(&db);
441            mgr.notify_subscribers(&name, &db);
442            ok(json!({"ok": true, "doc": node_to_response(&node), "seq": seq, "head": head}))
443        }
444    }
445}
446
447fn node_to_response(node: &Node) -> Value {
448    json!({
449        "_id":   node.id,
450        "_hash": node.hash,
451        "_seq":  node.seq,
452        "_coll": node.coll,
453        "data":  node.data,
454    })
455}
456
457async fn link_document(
458    State(mgr): State<Manager>,
459    headers: HeaderMap,
460    AxPath(name): AxPath<String>,
461    Json(body): Json<LinkBody>,
462) -> Response {
463    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
464    let db = match mgr.get_db(&name).await {
465        None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
466        Some(db) => db,
467    };
468    if !db.startup_ready.load(std::sync::atomic::Ordering::SeqCst) {
469        return err(StatusCode::SERVICE_UNAVAILABLE, "startup scan in progress");
470    }
471    match db.link(&body.frm, &body.rel, &body.to) {
472        Ok(()) => {
473            let (seq, head) = db_seq_head(&db);
474            ok(json!({"ok": true, "frm": body.frm, "rel": body.rel, "to": body.to, "seq": seq, "head": head}))
475        }
476        Err(e) => err(StatusCode::BAD_REQUEST, &e.to_string()),
477    }
478}
479
480async fn delete_document(
481    State(mgr): State<Manager>,
482    headers: HeaderMap,
483    AxPath((name, coll, id)): AxPath<(String, String, String)>,
484) -> Response {
485    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
486    let db = match mgr.get_db(&name).await {
487        None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
488        Some(db) => db,
489    };
490    // v2 DAG: tombstone write + id index removal — doc history is preserved in the DAG,
491    // but the live id pointer is cleared so queries and list() never return the doc.
492    let existed = match db.delete(&coll, &id) {
493        Ok(v)  => v,
494        Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
495    };
496    let (seq, head) = db_seq_head(&db);
497    ok(json!({"ok": existed, "seq": seq, "head": head}))
498}
499
500#[derive(Deserialize)]
501struct BatchOp {
502    op:  String,
503    coll: Option<String>,
504    id:  Option<String>,
505    doc: Option<Value>,
506    caused_by: Option<Vec<serde_json::Value>>,
507}
508#[derive(Deserialize)]
509struct BatchBody { ops: Vec<BatchOp> }
510
511async fn batch_operations(
512    State(mgr): State<Manager>,
513    headers: HeaderMap,
514    AxPath(name): AxPath<String>,
515    Json(body): Json<BatchBody>,
516) -> Response {
517    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
518    let db = match mgr.get_db(&name).await {
519        None => match mgr.create_db(&name).await {
520            Ok(db) => db,
521            Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
522        },
523        Some(db) => db,
524    };
525
526    if !db.startup_ready.load(std::sync::atomic::Ordering::SeqCst) {
527        return err(StatusCode::SERVICE_UNAVAILABLE,
528            "database startup in progress — reads available, writes retry in a moment");
529    }
530
531    // Split ops into puts (parallelisable) and deletes (sequential)
532    // Puts go through put_batch for parallel object + index writes.
533    // Deletes remain sequential (tombstone ordering matters).
534    let mut put_ops = vec![];
535    let mut del_ops: Vec<(String, String)> = vec![];
536    let mut op_order: Vec<(&str, usize)> = vec![];  // ("put"|"del", index into respective vec)
537
538    for op in &body.ops {
539        let t = op.op.to_lowercase();
540        match t.as_str() {
541            "put" => {
542                // Resolve caused_by items: accept hash strings (v2 native) OR seq integers (v1 compat).
543                let caused_by: Vec<String> = op.caused_by.clone().unwrap_or_default()
544                    .into_iter()
545                    .filter_map(|v| match v {
546                        serde_json::Value::String(s) => Some(s),
547                        serde_json::Value::Number(n) => {
548                            n.as_u64().and_then(|seq| db.get_hash_by_seq(seq))
549                        }
550                        _ => None,
551                    })
552                    .collect();
553                op_order.push(("put", put_ops.len()));
554                put_ops.push((
555                    op.coll.clone().unwrap_or_default(),
556                    op.id.clone().unwrap_or_default(),
557                    op.doc.clone().unwrap_or(json!({})),
558                    caused_by,
559                    None::<String>,
560                    None::<String>,
561                ));
562            }
563            "del" | "delete" => {
564                op_order.push(("del", del_ops.len()));
565                del_ops.push((
566                    op.coll.clone().unwrap_or_default(),
567                    op.id.clone().unwrap_or_default(),
568                ));
569            }
570            _ => { op_order.push(("unknown", 0)); }
571        }
572    }
573
574    // Execute all puts in parallel via put_batch
575    let put_results = if put_ops.is_empty() {
576        vec![]
577    } else {
578        match db.put_batch(put_ops) {
579            Ok(nodes) => nodes.into_iter().map(|n| json!({"op":"put","id":n.id,"seq":n.seq,"hash":n.hash})).collect(),
580            Err(e)    => return err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
581        }
582    };
583
584    // Execute deletes sequentially
585    let del_results: Vec<serde_json::Value> = del_ops.iter().map(|(coll, id)| {
586        match db.delete(coll, id) {
587            Ok(existed) => json!({"op":"del","id":id,"ok":existed}),
588            Err(e)      => json!({"op":"del","id":id,"error":e.to_string()}),
589        }
590    }).collect();
591
592    // Reconstruct results in original op order
593    let mut results = vec![];
594    for (kind, idx) in &op_order {
595        let r = match *kind {
596            "put"     => put_results.get(*idx).cloned().unwrap_or(json!({"op":"put","error":"missing"})),
597            "del"     => del_results.get(*idx).cloned().unwrap_or(json!({"op":"del","error":"missing"})),
598            _         => json!({"op": kind, "error": "unknown op"}),
599        };
600        results.push(r);
601    }
602    let (seq, head) = db_seq_head(&db);
603    // Notify live query subscribers after batch completes
604    mgr.notify_subscribers(&name, &db);
605    ok(json!({"results": results, "count": results.len(), "seq": seq, "head": head}))
606}
607
608#[derive(Deserialize)]
609struct IndexBody { coll: String, field: String, kind: Option<String> }
610
611async fn create_index(
612    State(mgr): State<Manager>,
613    headers: HeaderMap,
614    AxPath(name): AxPath<String>,
615    Json(body): Json<IndexBody>,
616) -> Response {
617    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
618    let db = match mgr.get_db(&name).await {
619        None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
620        Some(db) => db,
621    };
622    let kind = body.kind.as_deref().unwrap_or("eq");
623    match kind {
624        "sorted" | "eq" => {
625            db.create_sorted_index(&body.coll, &body.field);
626            ok(json!({"ok": true, "coll": body.coll, "field": body.field, "kind": kind}))
627        }
628        _ => err(StatusCode::BAD_REQUEST, &format!("unknown index kind: {}", kind)),
629    }
630}
631
632async fn verify_database(
633    State(mgr): State<Manager>,
634    headers: HeaderMap,
635    AxPath(name): AxPath<String>,
636) -> Response {
637    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
638    let db = match mgr.get_db(&name).await {
639        None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
640        Some(db) => db,
641    };
642    let (ok_count, tampered) = db.verify();
643    let (seq, head) = db_seq_head(&db);
644    ok(json!({
645        "ok": tampered.is_empty(),
646        "seq": seq,
647        "head": head,
648        "tamper_evident": true,
649        "objects_checked": ok_count,
650        "tampered": tampered,
651    }))
652}
653
654async fn checkpoint(
655    State(mgr): State<Manager>,
656    headers: HeaderMap,
657    AxPath(name): AxPath<String>,
658) -> Response {
659    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
660    let db = match mgr.get_db(&name).await {
661        None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
662        Some(db) => db,
663    };
664    let (seq, head) = db_seq_head(&db);
665    // v2 DAG is always "checkpointed" — content-addressed objects are inherently snapshotted
666    ok(json!({"ok": true, "head": head, "seq": seq}))
667}
668
669#[derive(Deserialize)]
670struct LogQuery { limit: Option<usize> }
671
672async fn get_log(
673    State(mgr): State<Manager>,
674    headers: HeaderMap,
675    AxPath(name): AxPath<String>,
676    AxQuery(q): AxQuery<LogQuery>,
677) -> Response {
678    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
679    let db = match mgr.get_db(&name).await {
680        None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
681        Some(db) => db,
682    };
683    let limit = q.limit.unwrap_or(50);
684    // v2: reconstruct log from objects (most recent first)
685    let mut log_entries: Vec<Value> = db.objects.all_hashes()
686        .filter_map(|h| db.objects.read(&h).ok())
687        .take(limit)
688        .map(|n| json!({
689            "seq": n.seq, "coll": n.coll, "id": n.id,
690            "hash": n.hash, "ts": n.ts, "op": "put"
691        }))
692        .collect();
693    log_entries.sort_by(|a, b|
694        b["seq"].as_u64().cmp(&a["seq"].as_u64())
695    );
696    log_entries.truncate(limit);
697    let (seq, head) = db_seq_head(&db);
698    ok(json!({"log": log_entries, "seq": seq, "head": head}))
699}
700
701// ── Live query subscriptions — POST /v1/databases/:name/subscribe ─────────────
702
703#[derive(Deserialize)]
704struct SubscribeBody { nql: String }
705
706async fn subscribe_query(
707    State(mgr): State<Manager>,
708    headers: HeaderMap,
709    AxPath(name): AxPath<String>,
710    Json(body): Json<SubscribeBody>,
711) -> Response {
712    if !mgr.check_auth(&headers) {
713        return err(StatusCode::UNAUTHORIZED, "unauthorized");
714    }
715    let db = match mgr.get_db(&name).await {
716        None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
717        Some(db) => db,
718    };
719
720    let (sub_id, rx) = mgr.subscribe(&name, body.nql.clone());
721
722    // Send the initial query result immediately as the first SSE event
723    if let Ok((rows, _)) = crate::nql::query(&db, &body.nql) {
724        let init = json!({
725            "sub_id": sub_id,
726            "db":     &name,
727            "nql":    &body.nql,
728            "rows":   rows,
729            "count":  rows.len(),
730            "event":  "initial",
731        });
732        // Update last_hash so we don't re-send this on the next write if unchanged
733        if let Some(mut entry) = mgr.subs.get_mut(&(name.clone(), sub_id)) {
734            let hash = format!("{:?}", rows);
735            entry.value_mut().1 = hash;
736        }
737        // Send the initial result through the channel
738        if let Some(entry) = mgr.subs.get(&(name.clone(), sub_id)) {
739            let _ = entry.value().2.send(init.to_string());
740        }
741    }
742
743    let stream = BroadcastStream::new(rx).filter_map(|msg| {
744        match msg {
745            Ok(line) => Some(Ok::<Event, std::convert::Infallible>(Event::default().data(line))),
746            Err(_)   => None,
747        }
748    });
749    Sse::new(stream)
750        .keep_alive(KeepAlive::default())
751        .into_response()
752}
753
754async fn unsubscribe_query(
755    State(mgr): State<Manager>,
756    headers: HeaderMap,
757    AxPath((name, sub_id)): AxPath<(String, u64)>,
758) -> Response {
759    if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
760    mgr.unsubscribe(&name, sub_id);
761    ok(json!({"ok": true, "sub_id": sub_id}))
762}
763
764// ── SSE log stream — GET /events ──────────────────────────────────────────────
765
766async fn log_events(State(mgr): State<Manager>) -> Sse<impl futures_core::Stream<Item = Result<Event, std::convert::Infallible>>> {
767    let rx = mgr.log_tx.subscribe();
768    let stream = BroadcastStream::new(rx).filter_map(|msg| {
769        match msg {
770            Ok(line) => Some(Ok::<Event, std::convert::Infallible>(Event::default().data(line))),
771            Err(_)   => None,  // lagged — skip
772        }
773    });
774    Sse::new(stream).keep_alive(KeepAlive::default())
775}
776
777// ── Router ────────────────────────────────────────────────────────────────────
778
779pub fn router(mgr: Manager) -> Router {
780    Router::new()
781        .route("/health",                                        get(health))
782        .route("/events",                                        get(log_events))
783        .route("/v1/databases",                                  get(list_databases).post(create_database))
784        .route("/v1/databases/:name",                            get(get_database).delete(drop_database))
785        .route("/v1/databases/:name/query",                      post(query_database))
786        .route("/v1/databases/:name/put",                        post(put_document))
787        .route("/v1/databases/:name/link",                       post(link_document))
788        .route("/v1/databases/:name/rows/:coll/:id",             delete(delete_document))
789        .route("/v1/databases/:name/batch",                      post(batch_operations))
790        .route("/v1/databases/:name/index",                      post(create_index))
791        .route("/v1/databases/:name/verify",                     get(verify_database))
792        .route("/v1/databases/:name/checkpoint",                 post(checkpoint))
793        .route("/v1/databases/:name/log",                        get(get_log))
794        .route("/v1/databases/:name/subscribe",                  post(subscribe_query))
795        .route("/v1/databases/:name/subscribe/:sub_id",          delete(unsubscribe_query))
796        .with_state(mgr)
797}
798
799/// Start the nedbd v2 server.
800pub async fn run(host: &str, port: u16, data_dir: &str, tmk: Option<[u8; 32]>, token: Option<String>, memory_mode: bool) -> anyhow::Result<()> {
801    let mgr = Manager::new(Path::new(data_dir), tmk, token, memory_mode);
802    mgr.open_all().await?;
803
804    let has_token = mgr.token.is_some();
805    let mgr_for_shutdown = mgr.clone();
806    let app = router(mgr);
807    let addr = format!("{}:{}", host, port).parse::<std::net::SocketAddr>()?;
808    let banner = format!(r#"
809810          ╱ ╲               N E D B  ·  DAG ENGINE  {}
811         ◆   ◆              ─────────────────────────────────────────────
812        ╱ ╲ ╱ ╲             content-addressed · tamper-evident · causal
813       ◆   ◆   ◆            bi-temporal · replay-protected · encrypted
814      ╱ ╲ ╱ ╲ ╱ ╲
815     ◆   ◆   ◆   ◆          © INTERCHAINED, LLC  ×  Vex (Claude Sonnet 4.6)
816    ╱ ╲ ╱ ╲ ╱ ╲ ╱ ╲         interchained.org   ·   hyperagent.com/refer/J2G6TCD7
817
818  ─────────────────────────────────────────────────────────────
819  listen   http://{}
820  data     {}
821  enc      {}
822  token    {}
823  memory   {}
824  ─────────────────────────────────────────────────────────────
825"#,
826        env!("CARGO_PKG_VERSION"),
827        addr,
828        data_dir,
829        if tmk.is_some() { "AES-256-GCM" } else { "off" },
830        if has_token { "on" } else { "off (set NEDBD_TOKEN to require auth)" },
831        if memory_mode { "yes — all data lost on exit (NEDBD_MEMORY=1)" } else { "no — durable DAG on disk" }
832    );
833    print!("{}", banner);
834
835    let listener = tokio::net::TcpListener::bind(addr).await?;
836
837    // ── Scheduled hourly checkpoint ────────────────────────────────────────────
838    // Flush MANIFEST every hour aligned to the system clock (top of the hour).
839    // Ensures warm-start data is always fresh even on long-running servers.
840    let mgr_hourly = mgr_for_shutdown.clone();
841    tokio::spawn(async move {
842        loop {
843            // Sleep until the next top-of-hour boundary
844            let now_secs = std::time::SystemTime::now()
845                .duration_since(std::time::UNIX_EPOCH)
846                .map(|d| d.as_secs()).unwrap_or(0);
847            let secs_into_hour = now_secs % 3600;
848            let sleep_secs = 3600 - secs_into_hour;
849            tokio::time::sleep(tokio::time::Duration::from_secs(sleep_secs)).await;
850            mgr_hourly.flush_all().await;
851            println!("  [nedbd] hourly checkpoint — manifests flushed");
852        }
853    });
854
855    // ── Graceful shutdown: SIGINT (Ctrl+C) + SIGTERM (systemctl stop) ─────────
856    let shutdown = async {
857        #[cfg(unix)]
858        {
859            use tokio::signal::unix::{signal, SignalKind};
860            let mut sigterm = signal(SignalKind::terminate()).unwrap();
861            let mut sigint  = signal(SignalKind::interrupt()).unwrap();
862            tokio::select! {
863                _ = sigterm.recv() => println!("  [nedbd] SIGTERM — flushing and exiting..."),
864                _ = sigint.recv()  => println!("  [nedbd] SIGINT  — flushing and exiting..."),
865            }
866        }
867        #[cfg(not(unix))]
868        {
869            tokio::signal::ctrl_c().await.ok();
870            println!("  [nedbd] shutting down — flushing manifests...");
871        }
872    };
873
874    axum::serve(listener, app)
875        .tcp_nodelay(true)
876        .with_graceful_shutdown(shutdown)
877        .await?;
878
879    // Final flush on exit
880    mgr_for_shutdown.flush_all().await;
881    println!("  [nedbd] goodbye");
882    Ok(())
883}