1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::thread;
7use tokio::sync::{mpsc, oneshot, watch};
8use tokio_stream::wrappers::ReceiverStream;
9use tonic::transport::{Identity, ServerTlsConfig};
10use tonic::{Request, Response, Status, transport::Server};
11
12use umadb_core::db::{
13 DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDB, is_request_idempotent, read_conditional,
14};
15use umadb_core::mvcc::Mvcc;
16use umadb_dcb::{DCBAppendCondition, DCBError, DCBEvent, DCBQuery, DCBResult, DCBSequencedEvent};
17
18use tokio::runtime::Runtime;
19use umadb_core::common::Position;
20use umadb_proto::{
21 AppendRequestProto, AppendResponseProto, HeadRequestProto, HeadResponseProto, ReadRequestProto,
22 ReadResponseProto, SequencedEventProto, UmaDbService, UmaDbServiceServer,
23 status_from_dcb_error,
24};
25
26const APPEND_BATCH_MAX_EVENTS: usize = 2000;
27const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
28const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
29
30#[derive(Clone, Debug)]
32pub struct ServerTlsOptions {
33 pub cert_pem: Vec<u8>,
34 pub key_pem: Vec<u8>,
35}
36
37fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
38 use std::time::Duration;
39 let mut server_builder = Server::builder()
40 .http2_keepalive_interval(Some(Duration::from_secs(5)))
41 .http2_keepalive_timeout(Some(Duration::from_secs(10)))
42 .initial_stream_window_size(Some(4 * 1024 * 1024))
43 .initial_connection_window_size(Some(8 * 1024 * 1024))
44 .tcp_nodelay(true)
45 .concurrency_limit_per_connection(1024);
46
47 if let Some(opts) = tls {
48 let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
49 server_builder = server_builder
50 .tls_config(ServerTlsConfig::new().identity(identity))
51 .expect("failed to apply TLS config");
52 }
53
54 server_builder
55}
56
57pub async fn start_server<P: AsRef<Path> + Send + 'static>(
59 path: P,
60 addr: &str,
61 shutdown_rx: oneshot::Receiver<()>,
62) -> Result<(), Box<dyn std::error::Error>> {
63 start_server_internal(path, addr, shutdown_rx, None).await
64}
65
66pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
68 path: P,
69 addr: &str,
70 shutdown_rx: oneshot::Receiver<()>,
71 cert_pem: Vec<u8>,
72 key_pem: Vec<u8>,
73) -> Result<(), Box<dyn std::error::Error>> {
74 let tls = ServerTlsOptions { cert_pem, key_pem };
75 start_server_internal(path, addr, shutdown_rx, Some(tls)).await
76}
77
78pub async fn start_server_secure_from_files<
80 P: AsRef<Path> + Send + 'static,
81 CP: AsRef<Path>,
82 KP: AsRef<Path>,
83>(
84 path: P,
85 addr: &str,
86 shutdown_rx: oneshot::Receiver<()>,
87 cert_path: CP,
88 key_path: KP,
89) -> Result<(), Box<dyn std::error::Error>> {
90 let cert_path_ref = cert_path.as_ref();
91 let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
92 format!(
93 "Failed to open TLS certificate file '{}': {}",
94 cert_path_ref.display(),
95 e
96 )
97 .into()
98 })?;
99
100 let key_path_ref = key_path.as_ref();
101 let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
102 format!(
103 "Failed to open TLS key file '{}': {}",
104 key_path_ref.display(),
105 e
106 )
107 .into()
108 })?;
109 start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
110}
111
112async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
113 path: P,
114 addr: &str,
115 shutdown_rx: oneshot::Receiver<()>,
116 tls: Option<ServerTlsOptions>,
117) -> Result<(), Box<dyn std::error::Error>> {
118 let addr = addr.parse()?;
119 let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
121 let server = UmaDBServer::new(path, srv_shutdown_rx)?;
122 if tls.is_some() {
123 println!("Started UmaDB server (with TLS) listening on {addr}");
124 } else {
125 println!("UmaDB server (insecure) listening on {addr}");
126 }
127
128 let mut server_builder = build_server_builder_with_options(tls);
129
130 use tonic_health::ServingStatus; let (health_reporter, health_service) = tonic_health::server::health_reporter();
133 health_reporter
135 .set_service_status("", ServingStatus::Serving)
136 .await;
137 health_reporter
138 .set_service_status("umadb.UmaDBService", ServingStatus::Serving)
139 .await;
140 let health_reporter_for_shutdown = health_reporter.clone();
141
142 server_builder
143 .add_service(health_service)
144 .add_service(server.into_service())
145 .serve_with_shutdown(addr, async move {
146 let _ = shutdown_rx.await;
148 let _ = health_reporter_for_shutdown
150 .set_service_status("", ServingStatus::NotServing)
151 .await;
152 let _ = health_reporter_for_shutdown
153 .set_service_status("umadb.UmaDBService", ServingStatus::NotServing)
154 .await;
155 let _ = srv_shutdown_tx.send(true);
157 println!("UmaDB server shutdown complete");
158 })
159 .await?;
160
161 Ok(())
162}
163
164pub struct UmaDBServer {
166 request_handler: RequestHandler,
167 shutdown_watch_rx: watch::Receiver<bool>,
168}
169
170impl UmaDBServer {
171 pub fn new<P: AsRef<Path> + Send + 'static>(
172 path: P,
173 shutdown_rx: watch::Receiver<bool>,
174 ) -> std::io::Result<Self> {
175 let command_handler = RequestHandler::new(path)?;
176 Ok(Self {
177 request_handler: command_handler,
178 shutdown_watch_rx: shutdown_rx,
179 })
180 }
181
182 pub fn into_service(self) -> UmaDbServiceServer<Self> {
183 UmaDbServiceServer::new(self)
184 }
185}
186
187#[tonic::async_trait]
188impl UmaDbService for UmaDBServer {
189 type ReadStream =
190 Pin<Box<dyn Stream<Item = Result<ReadResponseProto, Status>> + Send + 'static>>;
191
192 async fn read(
193 &self,
194 request: Request<ReadRequestProto>,
195 ) -> Result<Response<Self::ReadStream>, Status> {
196 let read_request = request.into_inner();
197
198 let mut query: Option<DCBQuery> = read_request.query.map(|q| q.into());
200 let start = read_request.start;
201 let backwards = read_request.backwards.unwrap_or(false);
202 let limit = read_request.limit;
203 let batch_size = read_request
205 .batch_size
206 .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
207 .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
208 let subscribe = read_request.subscribe.unwrap_or(false);
209
210 let (tx, rx) = mpsc::channel(2048);
212 let request_handler = self.request_handler.clone();
214 let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
216
217 tokio::spawn(async move {
219 let query_clone = query.take();
221 let mut next_start = start;
222 let mut sent_any = false;
223 let mut remaining_limit = limit.unwrap_or(u32::MAX);
224 let mut head_rx = request_handler.watch_head();
227 let captured_head = if !subscribe {
229 request_handler.head().await.unwrap_or(None)
230 } else {
231 None
232 };
233 loop {
234 if subscribe {
237 if tx.is_closed() {
238 break;
239 }
240 if *shutdown_watch_rx.borrow() {
241 break;
242 }
243 }
244 let read_limit = remaining_limit.min(batch_size);
246 if subscribe && limit.is_some() && remaining_limit == 0 {
248 break;
249 }
250 match request_handler
251 .read(query_clone.clone(), next_start, backwards, Some(read_limit))
252 .await
253 {
254 Ok((dcb_sequenced_events, head)) => {
255 let original_len = dcb_sequenced_events.len();
257
258 let sequenced_event_protos: Vec<SequencedEventProto> = dcb_sequenced_events
260 .into_iter()
261 .filter(|e| {
262 if let Some(h) = captured_head {
263 e.position <= h
264 } else {
265 true
266 }
267 })
268 .map(SequencedEventProto::from)
269 .collect();
270
271 let reached_captured_head = if captured_head.is_some() {
272 sequenced_event_protos.len() < original_len
274 } else {
275 false
276 };
277
278 let last_event_position = sequenced_event_protos.last().map(|e| e.position);
283 let head_to_send = if subscribe {
284 head
285 } else if limit.is_none() {
286 captured_head
287 } else {
288 last_event_position.or(head)
289 };
290
291 if sequenced_event_protos.is_empty() {
292 if !sent_any {
294 let response = ReadResponseProto {
295 events: vec![],
296 head: head_to_send,
297 };
298 let _ = tx.send(Ok(response)).await;
299 }
300 if subscribe {
302 loop {
304 if tx.is_closed() {
306 break;
307 }
308 let current_head = *head_rx.borrow();
309 if current_head
310 .map(|h| h >= next_start.unwrap_or(1))
311 .unwrap_or(false)
312 {
313 break; }
315 tokio::select! {
317 res = head_rx.changed() => {
318 if res.is_err() { break; }
319 }
320 res2 = shutdown_watch_rx.changed() => {
321 if res2.is_ok() {
322 if *shutdown_watch_rx.borrow() { break; }
324 } else {
325 break; }
327 }
328 }
329 }
330 continue;
331 }
332 break;
333 }
334
335 let sent_count = sequenced_event_protos.len() as u32;
337
338 let response = ReadResponseProto {
339 events: sequenced_event_protos,
340 head: head_to_send,
341 };
342
343 if tx.send(Ok(response)).await.is_err() {
344 break;
345 }
346 sent_any = true;
347
348 next_start =
350 last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
351
352 if reached_captured_head && !subscribe {
355 break;
356 }
357
358 if limit.is_some() {
360 if remaining_limit <= sent_count {
361 remaining_limit = 0;
362 } else {
363 remaining_limit -= sent_count;
364 }
365 if remaining_limit == 0 {
366 break;
367 }
368 }
369
370 tokio::task::yield_now().await;
372 }
373 Err(e) => {
374 let _ = tx.send(Err(status_from_dcb_error(&e))).await;
375 break;
376 }
377 }
378 }
379 });
380
381 Ok(Response::new(
383 Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
384 ))
385 }
386
387 async fn append(
388 &self,
389 request: Request<AppendRequestProto>,
390 ) -> Result<Response<AppendResponseProto>, Status> {
391 let req = request.into_inner();
392
393 let events: Vec<DCBEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
395 Ok(events) => events,
396 Err(e) => {
397 return Err(status_from_dcb_error(&e));
398 }
399 };
400 let condition = req.condition.map(|c| c.into());
401
402 match self.request_handler.append(events, condition).await {
404 Ok(position) => Ok(Response::new(AppendResponseProto { position })),
405 Err(e) => Err(status_from_dcb_error(&e)),
406 }
407 }
408
409 async fn head(
410 &self,
411 _request: Request<HeadRequestProto>,
412 ) -> Result<Response<HeadResponseProto>, Status> {
413 match self.request_handler.head().await {
415 Ok(position) => {
416 Ok(Response::new(HeadResponseProto { position }))
418 }
419 Err(e) => Err(status_from_dcb_error(&e)),
420 }
421 }
422}
423
424enum WriterRequest {
426 Append {
427 events: Vec<DCBEvent>,
428 condition: Option<DCBAppendCondition>,
429 response_tx: oneshot::Sender<DCBResult<u64>>,
430 },
431 Shutdown,
432}
433
434struct RequestHandler {
436 mvcc: Arc<Mvcc>,
437 head_watch_tx: watch::Sender<Option<u64>>,
438 writer_request_tx: mpsc::Sender<WriterRequest>,
439}
440
441impl RequestHandler {
442 fn new<P: AsRef<Path> + Send + 'static>(path: P) -> std::io::Result<Self> {
443 let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
445
446 let p = path.as_ref();
448 let file_path = if p.is_dir() {
449 p.join(DEFAULT_DB_FILENAME)
450 } else {
451 p.to_path_buf()
452 };
453 let mvcc = Arc::new(
454 Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)
455 .map_err(|e| std::io::Error::other(format!("Failed to init LMDB: {e:?}")))?,
456 );
457
458 let init_head = {
460 let (_, header) = mvcc
461 .get_latest_header()
462 .map_err(|e| std::io::Error::other(format!("Failed to read header: {e:?}")))?;
463 let last = header.next_position.0.saturating_sub(1);
464 if last == 0 { None } else { Some(last) }
465 };
466 let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
467
468 let mvcc_for_writer = mvcc.clone();
470 let head_tx_writer = head_tx.clone();
471 thread::spawn(move || {
472 let db = UmaDB::from_arc(mvcc_for_writer);
473
474 let rt = Runtime::new().unwrap();
476
477 rt.block_on(async {
479 while let Some(request) = request_rx.recv().await {
480 match request {
481 WriterRequest::Append {
482 events,
483 condition,
484 response_tx,
485 } => {
486 let mut responders: Vec<oneshot::Sender<DCBResult<u64>>> = Vec::new();
490
491 let mut total_events = 0;
492 total_events += events.len();
493 let mvcc = &db.mvcc;
496 let mut writer = match mvcc.writer() {
497 Ok(writer) => writer,
498 Err(err) => {
499 let _ = response_tx.send(Err(err));
500 continue;
501 }
502 };
503 responders.push(response_tx);
504
505 let mut results: Vec<DCBResult<u64>> = Vec::new();
506
507 UmaDB::process_append_request(
508 events,
509 condition,
510 false,
511 mvcc,
512 &mut writer,
513 &mut results,
514 );
515
516 loop {
520 if total_events >= APPEND_BATCH_MAX_EVENTS {
521 break;
522 }
523 match request_rx.try_recv() {
524 Ok(WriterRequest::Append {
525 events,
526 condition,
527 response_tx,
528 }) => {
529 let ev_len = events.len();
530 UmaDB::process_append_request(
531 events,
532 condition,
533 false,
534 mvcc,
535 &mut writer,
536 &mut results,
537 );
538 responders.push(response_tx);
539 total_events += ev_len;
540 }
541 Ok(WriterRequest::Shutdown) => {
542 break;
547 }
548 Err(mpsc::error::TryRecvError::Empty) => break,
549 Err(mpsc::error::TryRecvError::Disconnected) => break,
550 }
551 }
552 let batch_result = match mvcc.commit(&mut writer) {
557 Ok(_) => Ok(results),
558 Err(err) => Err(err),
559 };
560
561 match batch_result {
563 Ok(results) => {
564 let mut max_ok: Option<u64> = None;
567 for (res, tx) in results.into_iter().zip(responders.into_iter())
568 {
569 if let Ok(v) = &res {
570 max_ok = Some(max_ok.map_or(*v, |m| m.max(*v)));
571 }
572 let _ = tx.send(res);
573 }
574 if let Some(h) = max_ok {
576 let _ = head_tx_writer.send(Some(h));
577 }
578 }
579 Err(e) => {
580 let total = responders.len();
584 let mut iter = responders.into_iter();
585 for _ in 0..total {
586 if let Some(tx) = iter.next() {
587 let _ = tx.send(Err(clone_dcb_error(&e)));
588 }
589 }
590 }
591 }
592 }
593 WriterRequest::Shutdown => {
594 break;
595 }
596 }
597 }
598 });
599 });
600
601 Ok(Self {
602 mvcc,
603 head_watch_tx: head_tx,
604 writer_request_tx: request_tx,
605 })
606 }
607
608 async fn read(
609 &self,
610 query: Option<DCBQuery>,
611 start: Option<u64>,
612 backwards: bool,
613 limit: Option<u32>,
614 ) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
615 let reader = self.mvcc.reader()?;
616 let last_committed_position = reader.next_position.0.saturating_sub(1);
617
618 let q = query.unwrap_or(DCBQuery { items: vec![] });
619 let start_position = start.map(Position);
620
621 let events = read_conditional(
622 &self.mvcc,
623 &std::collections::HashMap::new(),
624 reader.events_tree_root_id,
625 reader.tags_tree_root_id,
626 q,
627 start_position,
628 backwards,
629 limit,
630 false,
631 )
632 .map_err(|e| DCBError::Corruption(format!("{e}")))?;
633
634 let head = if limit.is_none() {
635 if last_committed_position == 0 {
636 None
637 } else {
638 Some(last_committed_position)
639 }
640 } else {
641 events.last().map(|e| e.position)
642 };
643
644 Ok((events, head))
645 }
646
647 async fn head(&self) -> DCBResult<Option<u64>> {
648 let (_, header) = self
649 .mvcc
650 .get_latest_header()
651 .map_err(|e| DCBError::Corruption(format!("{e}")))?;
652 let last = header.next_position.0.saturating_sub(1);
653 if last == 0 { Ok(None) } else { Ok(Some(last)) }
654 }
655 pub async fn append(
656 &self,
657 events: Vec<DCBEvent>,
658 condition: Option<DCBAppendCondition>,
659 ) -> DCBResult<u64> {
660 let pre_append_decision = if let Some(mut given_condition) = condition {
662 let reader = self.mvcc.reader()?;
663 let current_head = {
664 let last = reader.next_position.0.saturating_sub(1);
665 if last == 0 { None } else { Some(last) }
666 };
667
668 let from = given_condition.after.map(|after| Position(after + 1));
670 let empty_dirty = std::collections::HashMap::new();
671 let found = read_conditional(
672 &self.mvcc,
673 &empty_dirty,
674 reader.events_tree_root_id,
675 reader.tags_tree_root_id,
676 given_condition.fail_if_events_match.clone(),
677 from,
678 false,
679 Some(1),
680 false,
681 )?;
682
683 if let Some(matched) = found.first() {
684 match is_request_idempotent(
686 &self.mvcc,
687 &empty_dirty,
688 reader.events_tree_root_id,
689 reader.tags_tree_root_id,
690 &events,
691 given_condition.fail_if_events_match.clone(),
692 from,
693 ) {
694 Ok(Some(last_recorded_position)) => {
695 PreAppendDecision::AlreadyAppended(last_recorded_position)
697 }
698 Ok(None) => {
699 let msg = format!(
701 "condition: {:?} matched: {:?}",
702 given_condition.clone(),
703 matched,
704 );
705 return Err(DCBError::IntegrityError(msg));
706 }
707 Err(err) => {
708 return Err(err);
710 }
711 }
712 } else {
713 let new_after = std::cmp::max(
715 given_condition.after.unwrap_or(0),
716 current_head.unwrap_or(0),
717 );
718 given_condition.after = Some(new_after);
719
720 PreAppendDecision::UseCondition(Some(given_condition))
721 }
722 } else {
723 PreAppendDecision::UseCondition(None)
725 };
726
727 match pre_append_decision {
729 PreAppendDecision::AlreadyAppended(last_found_position) => {
730 Ok(last_found_position)
732 }
733 PreAppendDecision::UseCondition(adjusted_condition) => {
734 let (response_tx, response_rx) = oneshot::channel();
736
737 self.writer_request_tx
738 .send(WriterRequest::Append {
739 events,
740 condition: adjusted_condition,
741 response_tx,
742 })
743 .await
744 .map_err(|_| {
745 DCBError::Io(std::io::Error::other(
746 "Failed to send append request to EventStore thread",
747 ))
748 })?;
749
750 response_rx.await.map_err(|_| {
751 DCBError::Io(std::io::Error::other(
752 "Failed to receive append response from EventStore thread",
753 ))
754 })?
755 }
756 }
757 }
758
759 fn watch_head(&self) -> watch::Receiver<Option<u64>> {
760 self.head_watch_tx.subscribe()
761 }
762
763 #[allow(dead_code)]
764 async fn shutdown(&self) {
765 let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
766 }
767}
768
769fn clone_dcb_error(src: &DCBError) -> DCBError {
770 match src {
771 DCBError::Io(err) => DCBError::Io(std::io::Error::other(err.to_string())),
772 DCBError::IntegrityError(s) => DCBError::IntegrityError(s.clone()),
773 DCBError::Corruption(s) => DCBError::Corruption(s.clone()),
774 DCBError::PageNotFound(id) => DCBError::PageNotFound(*id),
775 DCBError::DirtyPageNotFound(id) => DCBError::DirtyPageNotFound(*id),
776 DCBError::RootIDMismatch(old_id, new_id) => DCBError::RootIDMismatch(*old_id, *new_id),
777 DCBError::DatabaseCorrupted(s) => DCBError::DatabaseCorrupted(s.clone()),
778 DCBError::InternalError(s) => DCBError::InternalError(s.clone()),
779 DCBError::SerializationError(s) => DCBError::SerializationError(s.clone()),
780 DCBError::DeserializationError(s) => DCBError::DeserializationError(s.clone()),
781 DCBError::PageAlreadyFreed(id) => DCBError::PageAlreadyFreed(*id),
782 DCBError::PageAlreadyDirty(id) => DCBError::PageAlreadyDirty(*id),
783 DCBError::TransportError(err) => DCBError::TransportError(err.clone()),
784 DCBError::CancelledByUser() => DCBError::CancelledByUser(),
785 }
786}
787
788impl Clone for RequestHandler {
790 fn clone(&self) -> Self {
791 Self {
792 mvcc: self.mvcc.clone(),
793 head_watch_tx: self.head_watch_tx.clone(),
794 writer_request_tx: self.writer_request_tx.clone(),
795 }
796 }
797}
798
799#[derive(Debug)]
800enum PreAppendDecision {
801 UseCondition(Option<DCBAppendCondition>),
803 AlreadyAppended(u64),
805}