1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::LazyLock;
7use std::thread;
8use std::time::Instant;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::{Identity, ServerTlsConfig};
12use tonic::{Request, Response, Status, transport::Server};
13use umadb_core::db::{
14 DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDB, is_request_idempotent, read_conditional,
15};
16use umadb_core::mvcc::Mvcc;
17use umadb_dcb::{DCBAppendCondition, DCBError, DCBEvent, DCBQuery, DCBResult, DCBSequencedEvent};
18
19use tokio::runtime::Runtime;
20use tonic::codegen::http;
21use tonic::transport::server::TcpIncoming;
22use umadb_core::common::Position;
23
24use std::convert::Infallible;
25use std::future::Future;
26use std::task::{Context, Poll};
27use tonic::server::NamedService;
28use umadb_proto::status_from_dcb_error;
29
30#[derive(Clone, Debug)]
32pub struct PathRewriterService<S> {
33 inner: S,
34}
35
36impl<S> tower::Service<http::Request<tonic::body::Body>> for PathRewriterService<S>
37where
38 S: tower::Service<
39 http::Request<tonic::body::Body>,
40 Response = http::Response<tonic::body::Body>,
41 Error = Infallible,
42 > + Clone
43 + Send
44 + 'static,
45 S::Future: Send + 'static,
46{
47 type Response = S::Response;
48 type Error = S::Error;
49 type Future =
50 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
51
52 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53 self.inner.poll_ready(cx)
54 }
55
56 fn call(&mut self, mut req: http::Request<tonic::body::Body>) -> Self::Future {
57 let uri = req.uri().clone();
58 let path = uri.path();
59
60 if path.starts_with("/umadb.UmaDBService/") {
62 let new_path_str = path.replace("/umadb.UmaDBService/", "/umadb.v1.DCB/");
63
64 let new_uri = if let (Some(scheme), Some(authority)) = (uri.scheme(), uri.authority()) {
67 http::Uri::builder()
69 .scheme(scheme.clone())
70 .authority(authority.clone())
71 .path_and_query(new_path_str.as_str())
72 .build()
73 .ok() } else {
75 new_path_str.parse::<http::Uri>().ok()
78 };
79
80 if let Some(final_uri) = new_uri {
81 *req.uri_mut() = final_uri;
82 } else {
83 eprintln!("Failed to construct valid URI for path: {}", path);
84 }
85 }
86
87 let fut = self.inner.call(req);
88 Box::pin(fut)
89 }
90}
91
92impl<S: NamedService> NamedService for PathRewriterService<S> {
94 const NAME: &'static str = S::NAME;
95}
96
97#[derive(Clone, Debug)]
98pub struct PathRewriterLayer;
99
100impl<S> tower::Layer<S> for PathRewriterLayer
101where
102 S: tower::Service<
103 http::Request<tonic::body::Body>,
104 Response = http::Response<tonic::body::Body>,
105 Error = Infallible,
106 > + Clone
107 + Send
108 + 'static,
109 S::Future: Send + 'static,
110{
111 type Service = PathRewriterService<S>;
112
113 fn layer(&self, inner: S) -> Self::Service {
114 PathRewriterService { inner }
115 }
116}
117
118static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
119
120const APPEND_BATCH_MAX_EVENTS: usize = 2000;
121const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
122const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
123
124#[derive(Clone, Debug)]
126pub struct ServerTlsOptions {
127 pub cert_pem: Vec<u8>,
128 pub key_pem: Vec<u8>,
129}
130
131pub fn uptime() -> std::time::Duration {
132 START_TIME.elapsed()
133}
134
135fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
136 use std::time::Duration;
137 let mut server_builder = Server::builder()
138 .http2_keepalive_interval(Some(Duration::from_secs(5)))
139 .http2_keepalive_timeout(Some(Duration::from_secs(10)))
140 .initial_stream_window_size(Some(4 * 1024 * 1024))
141 .initial_connection_window_size(Some(8 * 1024 * 1024))
142 .tcp_nodelay(true)
143 .concurrency_limit_per_connection(1024);
144
145 if let Some(opts) = tls {
146 let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
147 server_builder = server_builder
148 .tls_config(ServerTlsConfig::new().identity(identity))
149 .expect("failed to apply TLS config");
150 }
151
152 server_builder
153}
154
155pub async fn start_server<P: AsRef<Path> + Send + 'static>(
157 path: P,
158 addr: &str,
159 shutdown_rx: oneshot::Receiver<()>,
160) -> Result<(), Box<dyn std::error::Error>> {
161 start_server_internal(path, addr, shutdown_rx, None, None).await
162}
163
164pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
166 path: P,
167 addr: &str,
168 shutdown_rx: oneshot::Receiver<()>,
169 cert_pem: Vec<u8>,
170 key_pem: Vec<u8>,
171) -> Result<(), Box<dyn std::error::Error>> {
172 let tls = ServerTlsOptions { cert_pem, key_pem };
173 start_server_internal(path, addr, shutdown_rx, Some(tls), None).await
174}
175
176pub async fn start_server_secure_from_files<
178 P: AsRef<Path> + Send + 'static,
179 CP: AsRef<Path>,
180 KP: AsRef<Path>,
181>(
182 path: P,
183 addr: &str,
184 shutdown_rx: oneshot::Receiver<()>,
185 cert_path: CP,
186 key_path: KP,
187) -> Result<(), Box<dyn std::error::Error>> {
188 let cert_path_ref = cert_path.as_ref();
189 let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
190 format!(
191 "Failed to open TLS certificate file '{}': {}",
192 cert_path_ref.display(),
193 e
194 )
195 .into()
196 })?;
197
198 let key_path_ref = key_path.as_ref();
199 let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
200 format!(
201 "Failed to open TLS key file '{}': {}",
202 key_path_ref.display(),
203 e
204 )
205 .into()
206 })?;
207 start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
208}
209
210pub async fn start_server_with_api_key<P: AsRef<Path> + Send + 'static>(
212 path: P,
213 addr: &str,
214 shutdown_rx: oneshot::Receiver<()>,
215 api_key: String,
216) -> Result<(), Box<dyn std::error::Error>> {
217 start_server_internal(path, addr, shutdown_rx, None, Some(api_key)).await
218}
219
220pub async fn start_server_secure_with_api_key<P: AsRef<Path> + Send + 'static>(
222 path: P,
223 addr: &str,
224 shutdown_rx: oneshot::Receiver<()>,
225 cert_pem: Vec<u8>,
226 key_pem: Vec<u8>,
227 api_key: String,
228) -> Result<(), Box<dyn std::error::Error>> {
229 let tls = ServerTlsOptions { cert_pem, key_pem };
230 start_server_internal(path, addr, shutdown_rx, Some(tls), Some(api_key)).await
231}
232
233pub async fn start_server_secure_from_files_with_api_key<
235 P: AsRef<Path> + Send + 'static,
236 CP: AsRef<Path>,
237 KP: AsRef<Path>,
238>(
239 path: P,
240 addr: &str,
241 shutdown_rx: oneshot::Receiver<()>,
242 cert_path: CP,
243 key_path: KP,
244 api_key: String,
245) -> Result<(), Box<dyn std::error::Error>> {
246 let cert_path_ref = cert_path.as_ref();
247 let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
248 format!(
249 "Failed to open TLS certificate file '{}': {}",
250 cert_path_ref.display(),
251 e
252 )
253 .into()
254 })?;
255
256 let key_path_ref = key_path.as_ref();
257 let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
258 format!(
259 "Failed to open TLS key file '{}': {}",
260 key_path_ref.display(),
261 e
262 )
263 .into()
264 })?;
265 start_server_secure_with_api_key(path, addr, shutdown_rx, cert_pem, key_pem, api_key).await
266}
267
268async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
269 path: P,
270 addr: &str,
271 shutdown_rx: oneshot::Receiver<()>,
272 tls: Option<ServerTlsOptions>,
273 api_key: Option<String>,
274) -> Result<(), Box<dyn std::error::Error>> {
275 let addr = addr.parse()?;
276 let incoming = match TcpIncoming::bind(addr) {
278 Ok(incoming) => incoming,
279 Err(err) => {
280 return Err(Box::new(DCBError::InitializationError(format!(
281 "Failed to bind to address {}: {}",
282 addr, err
283 ))));
284 }
285 }
286 .with_nodelay(Some(true))
287 .with_keepalive(Some(std::time::Duration::from_secs(60)));
288
289 let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
291 let dcb_server =
292 match DCBServer::new(path.as_ref().to_owned(), srv_shutdown_rx, api_key.clone()) {
293 Ok(server) => server,
294 Err(err) => {
295 return Err(Box::new(err));
296 }
297 };
298
299 println!(
300 "UmaDB has {:?} events",
301 dcb_server.request_handler.head().await?.unwrap_or(0)
302 );
303 let tls_mode_display_str = if tls.is_some() {
304 "with TLS"
305 } else {
306 "without TLS"
307 };
308
309 let api_key_display_str = if api_key.is_some() {
310 "with API key"
311 } else {
312 "without API key"
313 };
314
315 use tonic_health::ServingStatus; let (health_reporter, health_service) = tonic_health::server::health_reporter();
318 health_reporter
320 .set_service_status("", ServingStatus::Serving)
321 .await;
322 health_reporter
323 .set_service_status("umadb.v1.DCB", ServingStatus::Serving)
324 .await;
325 let health_reporter_for_shutdown = health_reporter.clone();
326
327 let mut builder = build_server_builder_with_options(tls)
329 .layer(PathRewriterLayer)
330 .add_service(health_service);
331
332 builder = builder.add_service(dcb_server.into_service());
334 let router = builder;
335
336 println!("UmaDB is listening on {addr} ({tls_mode_display_str}, {api_key_display_str})");
337 println!("UmaDB started in {:?}", uptime());
338 router
340 .serve_with_incoming_shutdown(incoming, async move {
341 let _ = shutdown_rx.await;
343 let _ = health_reporter_for_shutdown
345 .set_service_status("", ServingStatus::NotServing)
346 .await;
347 let _ = health_reporter_for_shutdown
348 .set_service_status("umadb.v1.DCB", ServingStatus::NotServing)
349 .await;
350 let _ = srv_shutdown_tx.send(true);
352 println!("UmaDB server shutdown complete");
353 })
354 .await?;
355
356 Ok(())
357}
358
359pub struct DCBServer {
361 pub(crate) request_handler: RequestHandler,
362 shutdown_watch_rx: watch::Receiver<bool>,
363 api_key: Option<String>,
364}
365
366impl DCBServer {
367 pub fn new<P: AsRef<Path> + Send + 'static>(
368 path: P,
369 shutdown_rx: watch::Receiver<bool>,
370 api_key: Option<String>,
371 ) -> DCBResult<Self> {
372 let command_handler = RequestHandler::new(path)?;
373 Ok(Self {
374 request_handler: command_handler,
375 shutdown_watch_rx: shutdown_rx,
376 api_key,
377 })
378 }
379
380 pub fn into_service(self) -> umadb_proto::v1::dcb_server::DcbServer<Self> {
381 umadb_proto::v1::dcb_server::DcbServer::new(self)
382 }
383}
384
385#[tonic::async_trait]
386impl umadb_proto::v1::dcb_server::Dcb for DCBServer {
387 type ReadStream =
388 Pin<Box<dyn Stream<Item = Result<umadb_proto::v1::ReadResponse, Status>> + Send + 'static>>;
389
390 async fn read(
391 &self,
392 request: Request<umadb_proto::v1::ReadRequest>,
393 ) -> Result<Response<Self::ReadStream>, Status> {
394 if let Some(expected) = &self.api_key {
396 let auth = request.metadata().get("authorization");
397 let expected_val = format!("Bearer {}", expected);
398 let ok = auth
399 .and_then(|m| m.to_str().ok())
400 .map(|s| s == expected_val)
401 .unwrap_or(false);
402 if !ok {
403 return Err(status_from_dcb_error(DCBError::AuthenticationError(
404 "missing or invalid API key".to_string(),
405 )));
406 }
407 }
408 let read_request = request.into_inner();
409
410 let mut query: Option<DCBQuery> = read_request.query.map(|q| q.into());
412 let start = read_request.start;
413 let backwards = read_request.backwards.unwrap_or(false);
414 let limit = read_request.limit;
415 let batch_size = read_request
417 .batch_size
418 .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
419 .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
420 let subscribe = read_request.subscribe.unwrap_or(false);
421
422 let (tx, rx) = mpsc::channel(2048);
424 let request_handler = self.request_handler.clone();
426 let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
428
429 tokio::spawn(async move {
431 let query_clone = query.take();
433 let mut next_start = start;
434 let mut sent_any = false;
435 let mut remaining_limit = limit.unwrap_or(u32::MAX);
436 let mut head_rx = request_handler.watch_head();
439 let captured_head = if !subscribe {
441 request_handler.head().await.unwrap_or(None)
442 } else {
443 None
444 };
445 loop {
446 if subscribe {
449 if tx.is_closed() {
450 break;
451 }
452 if *shutdown_watch_rx.borrow() {
453 break;
454 }
455 }
456 let read_limit = remaining_limit.min(batch_size);
458 if subscribe && limit.is_some() && remaining_limit == 0 {
460 break;
461 }
462 match request_handler
463 .read(query_clone.clone(), next_start, backwards, Some(read_limit))
464 .await
465 {
466 Ok((dcb_sequenced_events, head)) => {
467 let original_len = dcb_sequenced_events.len();
469
470 let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
472 dcb_sequenced_events
473 .into_iter()
474 .filter(|e| {
475 if let Some(h) = captured_head {
476 e.position <= h
477 } else {
478 true
479 }
480 })
481 .map(umadb_proto::v1::SequencedEvent::from)
482 .collect();
483
484 let reached_captured_head = if captured_head.is_some() {
485 sequenced_event_protos.len() < original_len
487 } else {
488 false
489 };
490
491 let last_event_position = sequenced_event_protos.last().map(|e| e.position);
496 let head_to_send = if subscribe {
497 head
498 } else if limit.is_none() {
499 captured_head
500 } else {
501 last_event_position.or(head)
502 };
503
504 if sequenced_event_protos.is_empty() {
505 if !sent_any {
507 let response = umadb_proto::v1::ReadResponse {
508 events: vec![],
509 head: head_to_send,
510 };
511 let _ = tx.send(Ok(response)).await;
512 }
513 if subscribe {
515 loop {
517 if tx.is_closed() {
519 break;
520 }
521 let current_head = *head_rx.borrow();
522 if current_head
523 .map(|h| h >= next_start.unwrap_or(1))
524 .unwrap_or(false)
525 {
526 break; }
528 tokio::select! {
530 res = head_rx.changed() => {
531 if res.is_err() { break; }
532 }
533 res2 = shutdown_watch_rx.changed() => {
534 if res2.is_ok() {
535 if *shutdown_watch_rx.borrow() { break; }
537 } else {
538 break; }
540 }
541 }
542 }
543 continue;
544 }
545 break;
546 }
547
548 let sent_count = sequenced_event_protos.len() as u32;
550
551 let response = umadb_proto::v1::ReadResponse {
552 events: sequenced_event_protos,
553 head: head_to_send,
554 };
555
556 if tx.send(Ok(response)).await.is_err() {
557 break;
558 }
559 sent_any = true;
560
561 next_start =
563 last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
564
565 if reached_captured_head && !subscribe {
568 break;
569 }
570
571 if limit.is_some() {
573 if remaining_limit <= sent_count {
574 remaining_limit = 0;
575 } else {
576 remaining_limit -= sent_count;
577 }
578 if remaining_limit == 0 {
579 break;
580 }
581 }
582
583 tokio::task::yield_now().await;
585 }
586 Err(e) => {
587 let _ = tx.send(Err(status_from_dcb_error(e))).await;
588 break;
589 }
590 }
591 }
592 });
593
594 Ok(Response::new(
596 Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
597 ))
598 }
599
600 async fn append(
601 &self,
602 request: Request<umadb_proto::v1::AppendRequest>,
603 ) -> Result<Response<umadb_proto::v1::AppendResponse>, Status> {
604 if let Some(expected) = &self.api_key {
606 let auth = request.metadata().get("authorization");
607 let expected_val = format!("Bearer {}", expected);
608 let ok = auth
609 .and_then(|m| m.to_str().ok())
610 .map(|s| s == expected_val)
611 .unwrap_or(false);
612 if !ok {
613 return Err(status_from_dcb_error(DCBError::AuthenticationError(
614 "missing or invalid API key".to_string(),
615 )));
616 }
617 }
618 let req = request.into_inner();
619
620 let events: Vec<DCBEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
622 Ok(events) => events,
623 Err(e) => {
624 return Err(status_from_dcb_error(e));
625 }
626 };
627 let condition = req.condition.map(|c| c.into());
628
629 match self.request_handler.append(events, condition).await {
631 Ok(position) => Ok(Response::new(umadb_proto::v1::AppendResponse { position })),
632 Err(e) => Err(status_from_dcb_error(e)),
633 }
634 }
635
636 async fn head(
637 &self,
638 request: Request<umadb_proto::v1::HeadRequest>,
639 ) -> Result<Response<umadb_proto::v1::HeadResponse>, Status> {
640 if let Some(expected) = &self.api_key {
642 let auth = request.metadata().get("authorization");
643 let expected_val = format!("Bearer {}", expected);
644 let ok = auth
645 .and_then(|m| m.to_str().ok())
646 .map(|s| s == expected_val)
647 .unwrap_or(false);
648 if !ok {
649 return Err(status_from_dcb_error(DCBError::AuthenticationError(
650 "missing or invalid API key".to_string(),
651 )));
652 }
653 }
654 match self.request_handler.head().await {
656 Ok(position) => {
657 Ok(Response::new(umadb_proto::v1::HeadResponse { position }))
659 }
660 Err(e) => Err(status_from_dcb_error(e)),
661 }
662 }
663}
664
665enum WriterRequest {
667 Append {
668 events: Vec<DCBEvent>,
669 condition: Option<DCBAppendCondition>,
670 response_tx: oneshot::Sender<DCBResult<u64>>,
671 },
672 Shutdown,
673}
674
675struct RequestHandler {
677 mvcc: Arc<Mvcc>,
678 head_watch_tx: watch::Sender<Option<u64>>,
679 writer_request_tx: mpsc::Sender<WriterRequest>,
680}
681
682impl RequestHandler {
683 fn new<P: AsRef<Path> + Send + 'static>(path: P) -> DCBResult<Self> {
684 let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
686
687 let p = path.as_ref();
689 let file_path = if p.is_dir() {
690 p.join(DEFAULT_DB_FILENAME)
691 } else {
692 p.to_path_buf()
693 };
694 let mvcc = Arc::new(Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)?);
695
696 let init_head = {
698 let (_, header) = mvcc.get_latest_header()?;
699 let last = header.next_position.0.saturating_sub(1);
700 if last == 0 { None } else { Some(last) }
701 };
702 let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
703
704 let mvcc_for_writer = mvcc.clone();
706 let head_tx_writer = head_tx.clone();
707 thread::spawn(move || {
708 let db = UmaDB::from_arc(mvcc_for_writer);
709
710 let rt = Runtime::new().unwrap();
712
713 rt.block_on(async {
715 while let Some(request) = request_rx.recv().await {
716 match request {
717 WriterRequest::Append {
718 events,
719 condition,
720 response_tx,
721 } => {
722 let mut responders: Vec<oneshot::Sender<DCBResult<u64>>> = Vec::new();
726
727 let mut total_events = 0;
728 total_events += events.len();
729 let mvcc = &db.mvcc;
732 let mut writer = match mvcc.writer() {
733 Ok(writer) => writer,
734 Err(err) => {
735 let _ = response_tx.send(Err(err));
736 continue;
737 }
738 };
739 responders.push(response_tx);
740
741 let mut results: Vec<DCBResult<u64>> = Vec::new();
742
743 UmaDB::process_append_request(
744 events,
745 condition,
746 false,
747 mvcc,
748 &mut writer,
749 &mut results,
750 );
751
752 loop {
756 if total_events >= APPEND_BATCH_MAX_EVENTS {
757 break;
758 }
759 match request_rx.try_recv() {
760 Ok(WriterRequest::Append {
761 events,
762 condition,
763 response_tx,
764 }) => {
765 let ev_len = events.len();
766 UmaDB::process_append_request(
767 events,
768 condition,
769 false,
770 mvcc,
771 &mut writer,
772 &mut results,
773 );
774 responders.push(response_tx);
775 total_events += ev_len;
776 }
777 Ok(WriterRequest::Shutdown) => {
778 break;
783 }
784 Err(mpsc::error::TryRecvError::Empty) => break,
785 Err(mpsc::error::TryRecvError::Disconnected) => break,
786 }
787 }
788 let batch_result = match mvcc.commit(&mut writer) {
793 Ok(_) => Ok(results),
794 Err(err) => Err(err),
795 };
796
797 match batch_result {
799 Ok(results) => {
800 let mut max_ok: Option<u64> = None;
803 for (res, tx) in results.into_iter().zip(responders.into_iter())
804 {
805 if let Ok(v) = &res {
806 max_ok = Some(max_ok.map_or(*v, |m| m.max(*v)));
807 }
808 let _ = tx.send(res);
809 }
810 if let Some(h) = max_ok {
812 let _ = head_tx_writer.send(Some(h));
813 }
814 }
815 Err(e) => {
816 let total = responders.len();
820 let mut iter = responders.into_iter();
821 for _ in 0..total {
822 if let Some(tx) = iter.next() {
823 let _ = tx.send(Err(clone_dcb_error(&e)));
824 }
825 }
826 }
827 }
828 }
829 WriterRequest::Shutdown => {
830 break;
831 }
832 }
833 }
834 });
835 });
836
837 Ok(Self {
838 mvcc,
839 head_watch_tx: head_tx,
840 writer_request_tx: request_tx,
841 })
842 }
843
844 async fn read(
845 &self,
846 query: Option<DCBQuery>,
847 start: Option<u64>,
848 backwards: bool,
849 limit: Option<u32>,
850 ) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
851 let reader = self.mvcc.reader()?;
852 let last_committed_position = reader.next_position.0.saturating_sub(1);
853
854 let q = query.unwrap_or(DCBQuery { items: vec![] });
855 let start_position = start.map(Position);
856
857 let events = read_conditional(
858 &self.mvcc,
859 &std::collections::HashMap::new(),
860 reader.events_tree_root_id,
861 reader.tags_tree_root_id,
862 q,
863 start_position,
864 backwards,
865 limit,
866 false,
867 )
868 .map_err(|e| DCBError::Corruption(format!("{e}")))?;
869
870 let head = if limit.is_none() {
871 if last_committed_position == 0 {
872 None
873 } else {
874 Some(last_committed_position)
875 }
876 } else {
877 events.last().map(|e| e.position)
878 };
879
880 Ok((events, head))
881 }
882
883 async fn head(&self) -> DCBResult<Option<u64>> {
884 let (_, header) = self
885 .mvcc
886 .get_latest_header()
887 .map_err(|e| DCBError::Corruption(format!("{e}")))?;
888 let last = header.next_position.0.saturating_sub(1);
889 if last == 0 { Ok(None) } else { Ok(Some(last)) }
890 }
891 pub async fn append(
892 &self,
893 events: Vec<DCBEvent>,
894 condition: Option<DCBAppendCondition>,
895 ) -> DCBResult<u64> {
896 let pre_append_decision = if let Some(mut given_condition) = condition {
898 let reader = self.mvcc.reader()?;
899 let current_head = {
900 let last = reader.next_position.0.saturating_sub(1);
901 if last == 0 { None } else { Some(last) }
902 };
903
904 let from = given_condition.after.map(|after| Position(after + 1));
906 let empty_dirty = std::collections::HashMap::new();
907 let found = read_conditional(
908 &self.mvcc,
909 &empty_dirty,
910 reader.events_tree_root_id,
911 reader.tags_tree_root_id,
912 given_condition.fail_if_events_match.clone(),
913 from,
914 false,
915 Some(1),
916 false,
917 )?;
918
919 if let Some(matched) = found.first() {
920 match is_request_idempotent(
922 &self.mvcc,
923 &empty_dirty,
924 reader.events_tree_root_id,
925 reader.tags_tree_root_id,
926 &events,
927 given_condition.fail_if_events_match.clone(),
928 from,
929 ) {
930 Ok(Some(last_recorded_position)) => {
931 PreAppendDecision::AlreadyAppended(last_recorded_position)
933 }
934 Ok(None) => {
935 let msg = format!(
937 "condition: {:?} matched: {:?}",
938 given_condition.clone(),
939 matched,
940 );
941 return Err(DCBError::IntegrityError(msg));
942 }
943 Err(err) => {
944 return Err(err);
946 }
947 }
948 } else {
949 let new_after = std::cmp::max(
951 given_condition.after.unwrap_or(0),
952 current_head.unwrap_or(0),
953 );
954 given_condition.after = Some(new_after);
955
956 PreAppendDecision::UseCondition(Some(given_condition))
957 }
958 } else {
959 PreAppendDecision::UseCondition(None)
961 };
962
963 match pre_append_decision {
965 PreAppendDecision::AlreadyAppended(last_found_position) => {
966 Ok(last_found_position)
968 }
969 PreAppendDecision::UseCondition(adjusted_condition) => {
970 let (response_tx, response_rx) = oneshot::channel();
972
973 self.writer_request_tx
974 .send(WriterRequest::Append {
975 events,
976 condition: adjusted_condition,
977 response_tx,
978 })
979 .await
980 .map_err(|_| {
981 DCBError::Io(std::io::Error::other(
982 "Failed to send append request to EventStore thread",
983 ))
984 })?;
985
986 response_rx.await.map_err(|_| {
987 DCBError::Io(std::io::Error::other(
988 "Failed to receive append response from EventStore thread",
989 ))
990 })?
991 }
992 }
993 }
994
995 fn watch_head(&self) -> watch::Receiver<Option<u64>> {
996 self.head_watch_tx.subscribe()
997 }
998
999 #[allow(dead_code)]
1000 async fn shutdown(&self) {
1001 let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
1002 }
1003}
1004
1005fn clone_dcb_error(src: &DCBError) -> DCBError {
1006 match src {
1007 DCBError::AuthenticationError(err) => DCBError::AuthenticationError(err.to_string()),
1008 DCBError::InitializationError(err) => DCBError::InitializationError(err.to_string()),
1009 DCBError::Io(err) => DCBError::Io(std::io::Error::other(err.to_string())),
1010 DCBError::IntegrityError(s) => DCBError::IntegrityError(s.clone()),
1011 DCBError::Corruption(s) => DCBError::Corruption(s.clone()),
1012 DCBError::PageNotFound(id) => DCBError::PageNotFound(*id),
1013 DCBError::DirtyPageNotFound(id) => DCBError::DirtyPageNotFound(*id),
1014 DCBError::RootIDMismatch(old_id, new_id) => DCBError::RootIDMismatch(*old_id, *new_id),
1015 DCBError::DatabaseCorrupted(s) => DCBError::DatabaseCorrupted(s.clone()),
1016 DCBError::InternalError(s) => DCBError::InternalError(s.clone()),
1017 DCBError::SerializationError(s) => DCBError::SerializationError(s.clone()),
1018 DCBError::DeserializationError(s) => DCBError::DeserializationError(s.clone()),
1019 DCBError::PageAlreadyFreed(id) => DCBError::PageAlreadyFreed(*id),
1020 DCBError::PageAlreadyDirty(id) => DCBError::PageAlreadyDirty(*id),
1021 DCBError::TransportError(err) => DCBError::TransportError(err.clone()),
1022 DCBError::CancelledByUser() => DCBError::CancelledByUser(),
1023 }
1024}
1025
1026impl Clone for RequestHandler {
1028 fn clone(&self) -> Self {
1029 Self {
1030 mvcc: self.mvcc.clone(),
1031 head_watch_tx: self.head_watch_tx.clone(),
1032 writer_request_tx: self.writer_request_tx.clone(),
1033 }
1034 }
1035}
1036
1037#[derive(Debug)]
1038enum PreAppendDecision {
1039 UseCondition(Option<DCBAppendCondition>),
1041 AlreadyAppended(u64),
1043}