1use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::atomic::AtomicU64;
13
14use axum::{
15 extract::{Path as AxPath, State, Query as AxQuery},
16 http::{HeaderMap, StatusCode},
17 response::{IntoResponse, Response, sse::{Event, KeepAlive, Sse}},
18 routing::{delete, get, post},
19 Json, Router,
20};
21use dashmap::DashMap;
22use serde::Deserialize;
23use serde_json::{json, Value};
24use tokio::sync::{broadcast, RwLock};
25use tokio_stream::wrappers::BroadcastStream;
26use tokio_stream::StreamExt as _;
27
28use crate::db::Db;
29use crate::nql;
30use crate::store::Node;
31
32const LOG_CHANNEL_CAP: usize = 512;
35const SUB_CHANNEL_CAP: usize = 256;
36
37type SubKey = (String, u64); type SubVal = (String, String, broadcast::Sender<String>); macro_rules! nlog {
47 ($tx:expr, $($arg:tt)*) => {{
48 let line = format!($($arg)*);
49 println!("{}", line);
50 let _ = $tx.send(line);
51 }};
52}
53
54#[derive(Clone)]
57pub struct Manager {
58 inner: Arc<RwLock<ManagerInner>>,
59 pub token: Option<String>,
60 pub log_tx: broadcast::Sender<String>,
62 subs: Arc<DashMap<SubKey, SubVal>>,
64 sub_ctr: Arc<AtomicU64>,
65}
66
67struct ManagerInner {
68 data_dir: PathBuf,
69 dbs: HashMap<String, Arc<Db>>,
70 tmk: Option<[u8; 32]>,
71 memory_mode: bool,
72}
73
74impl Manager {
75 pub fn new(data_dir: &Path, tmk: Option<[u8; 32]>, token: Option<String>, memory_mode: bool) -> Self {
76 let (log_tx, _) = broadcast::channel(LOG_CHANNEL_CAP);
77 Self {
78 inner: Arc::new(RwLock::new(ManagerInner {
79 data_dir: data_dir.to_path_buf(),
80 dbs: HashMap::new(),
81 tmk,
82 memory_mode,
83 })),
84 token,
85 log_tx,
86 subs: Arc::new(DashMap::new()),
87 sub_ctr: Arc::new(AtomicU64::new(1)),
88 }
89 }
90
91 fn subscribe(&self, db: &str, nql: String) -> (u64, broadcast::Receiver<String>) {
93 use std::sync::atomic::Ordering;
94 let id = self.sub_ctr.fetch_add(1, Ordering::Relaxed);
95 let (tx, rx) = broadcast::channel(SUB_CHANNEL_CAP);
96 self.subs.insert((db.to_string(), id), (nql, String::new(), tx));
97 (id, rx)
98 }
99
100 fn unsubscribe(&self, db: &str, sub_id: u64) {
102 self.subs.remove(&(db.to_string(), sub_id));
103 }
104
105 fn notify_subscribers(&self, db: &str, db_arc: &Arc<crate::db::Db>) {
107 let keys: Vec<SubKey> = self.subs.iter()
108 .filter(|e| e.key().0 == db)
109 .map(|e| e.key().clone())
110 .collect();
111
112 for key in keys {
113 if let Some(mut entry) = self.subs.get_mut(&key) {
114 let (nql, last_hash, tx) = entry.value_mut();
115 let rows = match crate::nql::query(db_arc, nql) {
117 Ok((rows, _)) => rows,
118 Err(_) => continue,
119 };
120 let new_hash = format!("{:?}", rows.iter().map(|r| r.to_string()).collect::<Vec<_>>());
122 if new_hash == *last_hash { continue; }
123 *last_hash = new_hash;
124 let event = json!({
126 "sub_id": key.1,
127 "db": &key.0,
128 "nql": nql.as_str(),
129 "rows": rows,
130 "count": rows.len(),
131 });
132 let _ = tx.send(event.to_string());
133 }
134 }
135 }
136
137 pub async fn open_all(&self) -> anyhow::Result<()> {
139 let (data_dir, tmk, memory_mode) = {
140 let inner = self.inner.read().await;
141 (inner.data_dir.clone(), inner.tmk, inner.memory_mode)
142 };
143 if memory_mode { return Ok(()); }
145 if !data_dir.exists() {
146 std::fs::create_dir_all(&data_dir)?;
147 return Ok(());
148 }
149 let mut names = vec![];
150 for entry in std::fs::read_dir(&data_dir)? {
151 let entry = entry?;
152 if entry.file_type()?.is_dir() {
153 names.push(entry.file_name().to_string_lossy().to_string());
154 }
155 }
156 let log_tx = self.log_tx.clone();
157 let mut inner = self.inner.write().await;
158 for name in names {
159 let db_path = inner.data_dir.join(&name);
160 let dek = tmk.map(|k| crate::store::Dek::from_tmk(&k, name.as_bytes()));
161 match Db::open(&db_path, dek) {
162 Ok(db) => {
163 nlog!(log_tx, " [nedbd] opened database {:?}", name);
164 let db_arc = Arc::new(db);
165 Db::start_cold_scan(Arc::clone(&db_arc));
166 Db::start_manifest_ticker(Arc::clone(&db_arc), 1000);
168 inner.dbs.insert(name, db_arc);
169 }
170 Err(e) => nlog!(log_tx, " [nedbd] ERROR opening {:?}: {}", name, e),
171 }
172 }
173 Ok(())
174 }
175
176 async fn get_db(&self, name: &str) -> Option<Arc<Db>> {
177 self.inner.read().await.dbs.get(name).cloned()
178 }
179
180 async fn create_db(&self, name: &str) -> anyhow::Result<Arc<Db>> {
181 let (data_dir, tmk, memory_mode) = {
182 let inner = self.inner.read().await;
183 (inner.data_dir.clone(), inner.tmk, inner.memory_mode)
184 };
185 let db = if memory_mode {
186 Arc::new(Db::in_memory())
188 } else {
189 let db_path = data_dir.join(name);
190 let dek = tmk.map(|k| crate::store::Dek::from_tmk(&k, name.as_bytes()));
191 let db = Arc::new(Db::open(&db_path, dek)?);
192 Db::start_cold_scan(Arc::clone(&db));
193 Db::start_manifest_ticker(Arc::clone(&db), 1000);
194 db
195 };
196 self.inner.write().await.dbs.insert(name.to_string(), db.clone());
197 Ok(db)
198 }
199
200 async fn drop_db(&self, name: &str) -> bool {
201 let db = self.inner.write().await.dbs.remove(name);
202 if let Some(db) = db {
203 db.flush_manifest_if_dirty();
205 let data_dir = self.inner.read().await.data_dir.clone();
206 let _ = std::fs::remove_dir_all(data_dir.join(name));
207 true
208 } else {
209 false
210 }
211 }
212
213 pub async fn flush_all(&self) {
215 let inner = self.inner.read().await;
216 for db in inner.dbs.values() {
217 db.flush_all(); }
219 }
220
221 async fn names(&self) -> Vec<String> {
222 self.inner.read().await.dbs.keys().cloned().collect()
223 }
224
225 pub fn log(&self, msg: impl Into<String>) {
227 let line = msg.into();
228 println!("{}", line);
229 let _ = self.log_tx.send(line);
230 }
231
232 fn check_auth(&self, headers: &HeaderMap) -> bool {
233 match &self.token {
234 None => true,
235 Some(required) => {
236 if let Some(auth) = headers.get("authorization") {
237 if let Ok(s) = auth.to_str() {
238 return s == format!("Bearer {}", required);
239 }
240 }
241 false
242 }
243 }
244 }
245}
246
247fn err(status: StatusCode, msg: &str) -> Response {
250 (status, Json(json!({"error": msg}))).into_response()
251}
252
253fn ok(body: Value) -> Response {
254 (StatusCode::OK, Json(body)).into_response()
255}
256
257fn db_seq_head(db: &Db) -> (u64, String) {
261 let seq = db.seq.load(std::sync::atomic::Ordering::SeqCst);
262 let head = db.head();
263 (seq, head)
264}
265
266async fn health(State(mgr): State<Manager>) -> Response {
269 let names = mgr.names().await;
270 let inner = mgr.inner.read().await;
271 ok(json!({
272 "ok": true,
273 "service": "nedbd",
274 "version": env!("CARGO_PKG_VERSION"),
275 "engine": "dag",
276 "memory": inner.memory_mode,
277 "databases": names,
278 "encrypted": inner.tmk.is_some(),
279 }))
280}
281
282async fn list_databases(State(mgr): State<Manager>, headers: HeaderMap) -> Response {
283 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
284 let names = mgr.names().await;
285 let summaries: Vec<Value> = {
286 let inner = mgr.inner.read().await;
287 names.iter().map(|n| {
288 if let Some(db) = inner.dbs.get(n) {
289 let (seq, head) = db_seq_head(db);
290 json!({"name": n, "seq": seq, "head": head, "collections": db.id_index.collections()})
291 } else {
292 json!({"name": n})
293 }
294 }).collect()
295 };
296 ok(json!({"databases": summaries}))
297}
298
299#[derive(Deserialize)]
300struct CreateDbBody { name: String }
301
302async fn create_database(
303 State(mgr): State<Manager>,
304 headers: HeaderMap,
305 Json(body): Json<CreateDbBody>,
306) -> Response {
307 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
308 if body.name.is_empty() { return err(StatusCode::BAD_REQUEST, "name is required"); }
309 match mgr.create_db(&body.name).await {
310 Ok(db) => {
311 let (seq, head) = db_seq_head(&db);
312 (StatusCode::CREATED, Json(json!({"database": {"name": body.name, "seq": seq, "head": head}}))).into_response()
313 }
314 Err(e) => err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
315 }
316}
317
318async fn get_database(
319 State(mgr): State<Manager>,
320 headers: HeaderMap,
321 AxPath(name): AxPath<String>,
322) -> Response {
323 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
324 match mgr.get_db(&name).await {
325 None => err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
326 Some(db) => {
327 let (seq, head) = db_seq_head(&db);
328 ok(json!({"name": name, "seq": seq, "head": head, "collections": db.id_index.collections()}))
329 }
330 }
331}
332
333async fn drop_database(
334 State(mgr): State<Manager>,
335 headers: HeaderMap,
336 AxPath(name): AxPath<String>,
337) -> Response {
338 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
339 let dropped = mgr.drop_db(&name).await;
340 ok(json!({"dropped": dropped}))
341}
342
343#[derive(Deserialize)]
344struct QueryBody { nql: String }
345
346async fn query_database(
347 State(mgr): State<Manager>,
348 headers: HeaderMap,
349 AxPath(name): AxPath<String>,
350 Json(body): Json<QueryBody>,
351) -> Response {
352 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
353 let db = match mgr.get_db(&name).await {
354 None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
355 Some(db) => db,
356 };
357 if body.nql.trim().is_empty() {
358 return err(StatusCode::BAD_REQUEST, "nql is required");
359 }
360 match nql::query(&db, &body.nql) {
361 Ok((rows, count)) => {
362 let (seq, head) = db_seq_head(&db);
363 ok(json!({"rows": rows, "count": count, "seq": seq, "head": head}))
364 }
365 Err(e) => err(StatusCode::BAD_REQUEST, &format!("NQL error: {}", e)),
366 }
367}
368
369#[derive(Deserialize)]
370struct PutBody {
371 coll: String,
372 id: String,
373 doc: Value,
374 caused_by: Option<Vec<serde_json::Value>>,
375 valid_from: Option<String>,
376 valid_to: Option<String>,
377 #[allow(dead_code)] evidence: Option<String>,
378 #[allow(dead_code)] confidence: Option<f64>,
379 #[allow(dead_code)] client: Option<String>,
380 #[allow(dead_code)] nonce: Option<u64>,
381 #[allow(dead_code)] idem: Option<String>,
382}
383
384#[derive(Deserialize)]
385struct LinkBody {
386 frm: String,
387 rel: String,
388 to: String,
389}
390
391async fn put_document(
392 State(mgr): State<Manager>,
393 headers: HeaderMap,
394 AxPath(name): AxPath<String>,
395 Json(body): Json<PutBody>,
396) -> Response {
397 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
398 let db = match mgr.get_db(&name).await {
399 None => {
400 match mgr.create_db(&name).await {
402 Ok(db) => db,
403 Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
404 }
405 }
406 Some(db) => db,
407 };
408 if !db.startup_ready.load(std::sync::atomic::Ordering::SeqCst) {
411 return err(StatusCode::SERVICE_UNAVAILABLE,
412 "database startup in progress — reads available, writes retry in a moment");
413 }
414 let caused_by: Vec<String> = body.caused_by.unwrap_or_default()
416 .into_iter()
417 .filter_map(|v| match v {
418 serde_json::Value::String(s) => Some(s),
419 serde_json::Value::Number(n) => {
420 n.as_u64().and_then(|seq| db.get_hash_by_seq(seq))
421 }
422 _ => None,
423 })
424 .collect();
425 let coll = body.coll.clone();
428 let id = body.id.clone();
429 let doc = body.doc.clone();
430 let vf = body.valid_from.clone();
431 let vt = body.valid_to.clone();
432 let db2 = Arc::clone(&db);
433 let result = tokio::task::spawn_blocking(move || {
434 db2.put(&coll, &id, doc, caused_by, vf, vt)
435 }).await;
436 match result {
437 Err(join_err) => err(StatusCode::INTERNAL_SERVER_ERROR, &join_err.to_string()),
438 Ok(Err(e)) => err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
439 Ok(Ok(node)) => {
440 let (seq, head) = db_seq_head(&db);
441 mgr.notify_subscribers(&name, &db);
442 ok(json!({"ok": true, "doc": node_to_response(&node), "seq": seq, "head": head}))
443 }
444 }
445}
446
447fn node_to_response(node: &Node) -> Value {
448 json!({
449 "_id": node.id,
450 "_hash": node.hash,
451 "_seq": node.seq,
452 "_coll": node.coll,
453 "data": node.data,
454 })
455}
456
457async fn link_document(
458 State(mgr): State<Manager>,
459 headers: HeaderMap,
460 AxPath(name): AxPath<String>,
461 Json(body): Json<LinkBody>,
462) -> Response {
463 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
464 let db = match mgr.get_db(&name).await {
465 None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
466 Some(db) => db,
467 };
468 if !db.startup_ready.load(std::sync::atomic::Ordering::SeqCst) {
469 return err(StatusCode::SERVICE_UNAVAILABLE, "startup scan in progress");
470 }
471 match db.link(&body.frm, &body.rel, &body.to) {
472 Ok(()) => {
473 let (seq, head) = db_seq_head(&db);
474 ok(json!({"ok": true, "frm": body.frm, "rel": body.rel, "to": body.to, "seq": seq, "head": head}))
475 }
476 Err(e) => err(StatusCode::BAD_REQUEST, &e.to_string()),
477 }
478}
479
480async fn delete_document(
481 State(mgr): State<Manager>,
482 headers: HeaderMap,
483 AxPath((name, coll, id)): AxPath<(String, String, String)>,
484) -> Response {
485 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
486 let db = match mgr.get_db(&name).await {
487 None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
488 Some(db) => db,
489 };
490 let existed = match db.delete(&coll, &id) {
493 Ok(v) => v,
494 Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
495 };
496 let (seq, head) = db_seq_head(&db);
497 ok(json!({"ok": existed, "seq": seq, "head": head}))
498}
499
500#[derive(Deserialize)]
501struct BatchOp {
502 op: String,
503 coll: Option<String>,
504 id: Option<String>,
505 doc: Option<Value>,
506 caused_by: Option<Vec<serde_json::Value>>,
507}
508#[derive(Deserialize)]
509struct BatchBody { ops: Vec<BatchOp> }
510
511async fn batch_operations(
512 State(mgr): State<Manager>,
513 headers: HeaderMap,
514 AxPath(name): AxPath<String>,
515 Json(body): Json<BatchBody>,
516) -> Response {
517 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
518 let db = match mgr.get_db(&name).await {
519 None => match mgr.create_db(&name).await {
520 Ok(db) => db,
521 Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
522 },
523 Some(db) => db,
524 };
525
526 if !db.startup_ready.load(std::sync::atomic::Ordering::SeqCst) {
527 return err(StatusCode::SERVICE_UNAVAILABLE,
528 "database startup in progress — reads available, writes retry in a moment");
529 }
530
531 let mut put_ops = vec![];
535 let mut del_ops: Vec<(String, String)> = vec![];
536 let mut op_order: Vec<(&str, usize)> = vec![]; for op in &body.ops {
539 let t = op.op.to_lowercase();
540 match t.as_str() {
541 "put" => {
542 let caused_by: Vec<String> = op.caused_by.clone().unwrap_or_default()
544 .into_iter()
545 .filter_map(|v| match v {
546 serde_json::Value::String(s) => Some(s),
547 serde_json::Value::Number(n) => {
548 n.as_u64().and_then(|seq| db.get_hash_by_seq(seq))
549 }
550 _ => None,
551 })
552 .collect();
553 op_order.push(("put", put_ops.len()));
554 put_ops.push((
555 op.coll.clone().unwrap_or_default(),
556 op.id.clone().unwrap_or_default(),
557 op.doc.clone().unwrap_or(json!({})),
558 caused_by,
559 None::<String>,
560 None::<String>,
561 ));
562 }
563 "del" | "delete" => {
564 op_order.push(("del", del_ops.len()));
565 del_ops.push((
566 op.coll.clone().unwrap_or_default(),
567 op.id.clone().unwrap_or_default(),
568 ));
569 }
570 _ => { op_order.push(("unknown", 0)); }
571 }
572 }
573
574 let put_results = if put_ops.is_empty() {
576 vec![]
577 } else {
578 match db.put_batch(put_ops) {
579 Ok(nodes) => nodes.into_iter().map(|n| json!({"op":"put","id":n.id,"seq":n.seq,"hash":n.hash})).collect(),
580 Err(e) => return err(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
581 }
582 };
583
584 let del_results: Vec<serde_json::Value> = del_ops.iter().map(|(coll, id)| {
586 match db.delete(coll, id) {
587 Ok(existed) => json!({"op":"del","id":id,"ok":existed}),
588 Err(e) => json!({"op":"del","id":id,"error":e.to_string()}),
589 }
590 }).collect();
591
592 let mut results = vec![];
594 for (kind, idx) in &op_order {
595 let r = match *kind {
596 "put" => put_results.get(*idx).cloned().unwrap_or(json!({"op":"put","error":"missing"})),
597 "del" => del_results.get(*idx).cloned().unwrap_or(json!({"op":"del","error":"missing"})),
598 _ => json!({"op": kind, "error": "unknown op"}),
599 };
600 results.push(r);
601 }
602 let (seq, head) = db_seq_head(&db);
603 mgr.notify_subscribers(&name, &db);
605 ok(json!({"results": results, "count": results.len(), "seq": seq, "head": head}))
606}
607
608#[derive(Deserialize)]
609struct IndexBody { coll: String, field: String, kind: Option<String> }
610
611async fn create_index(
612 State(mgr): State<Manager>,
613 headers: HeaderMap,
614 AxPath(name): AxPath<String>,
615 Json(body): Json<IndexBody>,
616) -> Response {
617 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
618 let db = match mgr.get_db(&name).await {
619 None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
620 Some(db) => db,
621 };
622 let kind = body.kind.as_deref().unwrap_or("eq");
623 match kind {
624 "sorted" | "eq" => {
625 db.create_sorted_index(&body.coll, &body.field);
626 ok(json!({"ok": true, "coll": body.coll, "field": body.field, "kind": kind}))
627 }
628 _ => err(StatusCode::BAD_REQUEST, &format!("unknown index kind: {}", kind)),
629 }
630}
631
632async fn verify_database(
633 State(mgr): State<Manager>,
634 headers: HeaderMap,
635 AxPath(name): AxPath<String>,
636) -> Response {
637 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
638 let db = match mgr.get_db(&name).await {
639 None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
640 Some(db) => db,
641 };
642 let (ok_count, tampered) = db.verify();
643 let (seq, head) = db_seq_head(&db);
644 ok(json!({
645 "ok": tampered.is_empty(),
646 "seq": seq,
647 "head": head,
648 "tamper_evident": true,
649 "objects_checked": ok_count,
650 "tampered": tampered,
651 }))
652}
653
654async fn checkpoint(
655 State(mgr): State<Manager>,
656 headers: HeaderMap,
657 AxPath(name): AxPath<String>,
658) -> Response {
659 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
660 let db = match mgr.get_db(&name).await {
661 None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
662 Some(db) => db,
663 };
664 let (seq, head) = db_seq_head(&db);
665 ok(json!({"ok": true, "head": head, "seq": seq}))
667}
668
669#[derive(Deserialize)]
670struct LogQuery { limit: Option<usize> }
671
672async fn get_log(
673 State(mgr): State<Manager>,
674 headers: HeaderMap,
675 AxPath(name): AxPath<String>,
676 AxQuery(q): AxQuery<LogQuery>,
677) -> Response {
678 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
679 let db = match mgr.get_db(&name).await {
680 None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
681 Some(db) => db,
682 };
683 let limit = q.limit.unwrap_or(50);
684 let mut log_entries: Vec<Value> = db.objects.all_hashes()
686 .filter_map(|h| db.objects.read(&h).ok())
687 .take(limit)
688 .map(|n| json!({
689 "seq": n.seq, "coll": n.coll, "id": n.id,
690 "hash": n.hash, "ts": n.ts, "op": "put"
691 }))
692 .collect();
693 log_entries.sort_by(|a, b|
694 b["seq"].as_u64().cmp(&a["seq"].as_u64())
695 );
696 log_entries.truncate(limit);
697 let (seq, head) = db_seq_head(&db);
698 ok(json!({"log": log_entries, "seq": seq, "head": head}))
699}
700
701#[derive(Deserialize)]
704struct SubscribeBody { nql: String }
705
706async fn subscribe_query(
707 State(mgr): State<Manager>,
708 headers: HeaderMap,
709 AxPath(name): AxPath<String>,
710 Json(body): Json<SubscribeBody>,
711) -> Response {
712 if !mgr.check_auth(&headers) {
713 return err(StatusCode::UNAUTHORIZED, "unauthorized");
714 }
715 let db = match mgr.get_db(&name).await {
716 None => return err(StatusCode::NOT_FOUND, &format!("database not found: {}", name)),
717 Some(db) => db,
718 };
719
720 let (sub_id, rx) = mgr.subscribe(&name, body.nql.clone());
721
722 if let Ok((rows, _)) = crate::nql::query(&db, &body.nql) {
724 let init = json!({
725 "sub_id": sub_id,
726 "db": &name,
727 "nql": &body.nql,
728 "rows": rows,
729 "count": rows.len(),
730 "event": "initial",
731 });
732 if let Some(mut entry) = mgr.subs.get_mut(&(name.clone(), sub_id)) {
734 let hash = format!("{:?}", rows);
735 entry.value_mut().1 = hash;
736 }
737 if let Some(entry) = mgr.subs.get(&(name.clone(), sub_id)) {
739 let _ = entry.value().2.send(init.to_string());
740 }
741 }
742
743 let stream = BroadcastStream::new(rx).filter_map(|msg| {
744 match msg {
745 Ok(line) => Some(Ok::<Event, std::convert::Infallible>(Event::default().data(line))),
746 Err(_) => None,
747 }
748 });
749 Sse::new(stream)
750 .keep_alive(KeepAlive::default())
751 .into_response()
752}
753
754async fn unsubscribe_query(
755 State(mgr): State<Manager>,
756 headers: HeaderMap,
757 AxPath((name, sub_id)): AxPath<(String, u64)>,
758) -> Response {
759 if !mgr.check_auth(&headers) { return err(StatusCode::UNAUTHORIZED, "unauthorized"); }
760 mgr.unsubscribe(&name, sub_id);
761 ok(json!({"ok": true, "sub_id": sub_id}))
762}
763
764async fn log_events(State(mgr): State<Manager>) -> Sse<impl futures_core::Stream<Item = Result<Event, std::convert::Infallible>>> {
767 let rx = mgr.log_tx.subscribe();
768 let stream = BroadcastStream::new(rx).filter_map(|msg| {
769 match msg {
770 Ok(line) => Some(Ok::<Event, std::convert::Infallible>(Event::default().data(line))),
771 Err(_) => None, }
773 });
774 Sse::new(stream).keep_alive(KeepAlive::default())
775}
776
777pub fn router(mgr: Manager) -> Router {
780 Router::new()
781 .route("/health", get(health))
782 .route("/events", get(log_events))
783 .route("/v1/databases", get(list_databases).post(create_database))
784 .route("/v1/databases/:name", get(get_database).delete(drop_database))
785 .route("/v1/databases/:name/query", post(query_database))
786 .route("/v1/databases/:name/put", post(put_document))
787 .route("/v1/databases/:name/link", post(link_document))
788 .route("/v1/databases/:name/rows/:coll/:id", delete(delete_document))
789 .route("/v1/databases/:name/batch", post(batch_operations))
790 .route("/v1/databases/:name/index", post(create_index))
791 .route("/v1/databases/:name/verify", get(verify_database))
792 .route("/v1/databases/:name/checkpoint", post(checkpoint))
793 .route("/v1/databases/:name/log", get(get_log))
794 .route("/v1/databases/:name/subscribe", post(subscribe_query))
795 .route("/v1/databases/:name/subscribe/:sub_id", delete(unsubscribe_query))
796 .with_state(mgr)
797}
798
799pub async fn run(host: &str, port: u16, data_dir: &str, tmk: Option<[u8; 32]>, token: Option<String>, memory_mode: bool) -> anyhow::Result<()> {
801 let mgr = Manager::new(Path::new(data_dir), tmk, token, memory_mode);
802 mgr.open_all().await?;
803
804 let has_token = mgr.token.is_some();
805 let mgr_for_shutdown = mgr.clone();
806 let app = router(mgr);
807 let addr = format!("{}:{}", host, port).parse::<std::net::SocketAddr>()?;
808 let banner = format!(r#"
809 ◆
810 ╱ ╲ N E D B · DAG ENGINE {}
811 ◆ ◆ ─────────────────────────────────────────────
812 ╱ ╲ ╱ ╲ content-addressed · tamper-evident · causal
813 ◆ ◆ ◆ bi-temporal · replay-protected · encrypted
814 ╱ ╲ ╱ ╲ ╱ ╲
815 ◆ ◆ ◆ ◆ © INTERCHAINED, LLC × Vex (Claude Sonnet 4.6)
816 ╱ ╲ ╱ ╲ ╱ ╲ ╱ ╲ interchained.org · hyperagent.com/refer/J2G6TCD7
817
818 ─────────────────────────────────────────────────────────────
819 listen http://{}
820 data {}
821 enc {}
822 token {}
823 memory {}
824 ─────────────────────────────────────────────────────────────
825"#,
826 env!("CARGO_PKG_VERSION"),
827 addr,
828 data_dir,
829 if tmk.is_some() { "AES-256-GCM" } else { "off" },
830 if has_token { "on" } else { "off (set NEDBD_TOKEN to require auth)" },
831 if memory_mode { "yes — all data lost on exit (NEDBD_MEMORY=1)" } else { "no — durable DAG on disk" }
832 );
833 print!("{}", banner);
834
835 let listener = tokio::net::TcpListener::bind(addr).await?;
836
837 let mgr_hourly = mgr_for_shutdown.clone();
841 tokio::spawn(async move {
842 loop {
843 let now_secs = std::time::SystemTime::now()
845 .duration_since(std::time::UNIX_EPOCH)
846 .map(|d| d.as_secs()).unwrap_or(0);
847 let secs_into_hour = now_secs % 3600;
848 let sleep_secs = 3600 - secs_into_hour;
849 tokio::time::sleep(tokio::time::Duration::from_secs(sleep_secs)).await;
850 mgr_hourly.flush_all().await;
851 println!(" [nedbd] hourly checkpoint — manifests flushed");
852 }
853 });
854
855 let shutdown = async {
857 #[cfg(unix)]
858 {
859 use tokio::signal::unix::{signal, SignalKind};
860 let mut sigterm = signal(SignalKind::terminate()).unwrap();
861 let mut sigint = signal(SignalKind::interrupt()).unwrap();
862 tokio::select! {
863 _ = sigterm.recv() => println!(" [nedbd] SIGTERM — flushing and exiting..."),
864 _ = sigint.recv() => println!(" [nedbd] SIGINT — flushing and exiting..."),
865 }
866 }
867 #[cfg(not(unix))]
868 {
869 tokio::signal::ctrl_c().await.ok();
870 println!(" [nedbd] shutting down — flushing manifests...");
871 }
872 };
873
874 axum::serve(listener, app)
875 .tcp_nodelay(true)
876 .with_graceful_shutdown(shutdown)
877 .await?;
878
879 mgr_for_shutdown.flush_all().await;
881 println!(" [nedbd] goodbye");
882 Ok(())
883}