1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::LazyLock;
7use std::thread;
8use std::time::Instant;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::{Identity, ServerTlsConfig};
12use tonic::{Request, Response, Status, transport::Server};
13use umadb_core::db::{
14 DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDb, clone_dcb_error, is_integrity_error,
15 is_request_idempotent, read_conditional, shadow_for_batch_abort,
16};
17use umadb_core::mvcc::Mvcc;
18use umadb_dcb::{
19 DcbAppendCondition, DcbEvent, DcbQuery, DcbResult, DcbSequencedEvent, DcbError, TrackingInfo,
20};
21
22use tokio::runtime::Runtime;
23use tonic::codegen::http;
24use tonic::transport::server::TcpIncoming;
25use umadb_core::common::Position;
26
27use std::convert::Infallible;
28use std::future::Future;
29use std::task::{Context, Poll};
30use tonic::server::NamedService;
31use umadb_proto::status_from_dcb_error;
32
33#[derive(Clone, Debug)]
35pub struct PathRewriterService<S> {
36 inner: S,
37}
38
39impl<S> tower::Service<http::Request<tonic::body::Body>> for PathRewriterService<S>
40where
41 S: tower::Service<
42 http::Request<tonic::body::Body>,
43 Response = http::Response<tonic::body::Body>,
44 Error = Infallible,
45 > + Clone
46 + Send
47 + 'static,
48 S::Future: Send + 'static,
49{
50 type Response = S::Response;
51 type Error = S::Error;
52 type Future =
53 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
54
55 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
56 self.inner.poll_ready(cx)
57 }
58
59 fn call(&mut self, mut req: http::Request<tonic::body::Body>) -> Self::Future {
60 let uri = req.uri().clone();
61 let path = uri.path();
62
63 if path.starts_with("/umadb.UmaDBService/") {
65 let new_path_str = path.replace("/umadb.UmaDBService/", "/umadb.v1.DCB/");
66
67 let new_uri = if let (Some(scheme), Some(authority)) = (uri.scheme(), uri.authority()) {
70 http::Uri::builder()
72 .scheme(scheme.clone())
73 .authority(authority.clone())
74 .path_and_query(new_path_str.as_str())
75 .build()
76 .ok() } else {
78 new_path_str.parse::<http::Uri>().ok()
81 };
82
83 if let Some(final_uri) = new_uri {
84 *req.uri_mut() = final_uri;
85 } else {
86 eprintln!("failed to construct valid URI for path: {}", path);
87 }
88 }
89
90 let fut = self.inner.call(req);
91 Box::pin(fut)
92 }
93}
94
95impl<S: NamedService> NamedService for PathRewriterService<S> {
97 const NAME: &'static str = S::NAME;
98}
99
100#[derive(Clone, Debug)]
101pub struct PathRewriterLayer;
102
103impl<S> tower::Layer<S> for PathRewriterLayer
104where
105 S: tower::Service<
106 http::Request<tonic::body::Body>,
107 Response = http::Response<tonic::body::Body>,
108 Error = Infallible,
109 > + Clone
110 + Send
111 + 'static,
112 S::Future: Send + 'static,
113{
114 type Service = PathRewriterService<S>;
115
116 fn layer(&self, inner: S) -> Self::Service {
117 PathRewriterService { inner }
118 }
119}
120
121static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
122
123const APPEND_BATCH_MAX_EVENTS: usize = 2000;
124const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
125const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
126
127#[derive(Clone, Debug)]
129pub struct ServerTlsOptions {
130 pub cert_pem: Vec<u8>,
131 pub key_pem: Vec<u8>,
132}
133
134pub fn uptime() -> std::time::Duration {
135 START_TIME.elapsed()
136}
137
138fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
139 use std::time::Duration;
140 let mut server_builder = Server::builder()
141 .http2_keepalive_interval(Some(Duration::from_secs(5)))
142 .http2_keepalive_timeout(Some(Duration::from_secs(10)))
143 .initial_stream_window_size(Some(4 * 1024 * 1024))
144 .initial_connection_window_size(Some(8 * 1024 * 1024))
145 .tcp_nodelay(true)
146 .concurrency_limit_per_connection(1024);
147
148 if let Some(opts) = tls {
149 let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
150 server_builder = server_builder
151 .tls_config(ServerTlsConfig::new().identity(identity))
152 .expect("failed to apply TLS config");
153 }
154
155 server_builder
156}
157
158pub async fn start_server<P: AsRef<Path> + Send + 'static>(
160 path: P,
161 addr: &str,
162 shutdown_rx: oneshot::Receiver<()>,
163) -> Result<(), Box<dyn std::error::Error>> {
164 start_server_internal(path, addr, shutdown_rx, None, None).await
165}
166
167pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
169 path: P,
170 addr: &str,
171 shutdown_rx: oneshot::Receiver<()>,
172 cert_pem: Vec<u8>,
173 key_pem: Vec<u8>,
174) -> Result<(), Box<dyn std::error::Error>> {
175 let tls = ServerTlsOptions { cert_pem, key_pem };
176 start_server_internal(path, addr, shutdown_rx, Some(tls), None).await
177}
178
179pub async fn start_server_secure_from_files<
181 P: AsRef<Path> + Send + 'static,
182 CP: AsRef<Path>,
183 KP: AsRef<Path>,
184>(
185 path: P,
186 addr: &str,
187 shutdown_rx: oneshot::Receiver<()>,
188 cert_path: CP,
189 key_path: KP,
190) -> Result<(), Box<dyn std::error::Error>> {
191 let cert_path_ref = cert_path.as_ref();
192 let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
193 format!(
194 "failed to open TLS certificate file '{}': {}",
195 cert_path_ref.display(),
196 e
197 )
198 .into()
199 })?;
200
201 let key_path_ref = key_path.as_ref();
202 let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
203 format!(
204 "failed to open TLS key file '{}': {}",
205 key_path_ref.display(),
206 e
207 )
208 .into()
209 })?;
210 start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
211}
212
213pub async fn start_server_with_api_key<P: AsRef<Path> + Send + 'static>(
215 path: P,
216 addr: &str,
217 shutdown_rx: oneshot::Receiver<()>,
218 api_key: String,
219) -> Result<(), Box<dyn std::error::Error>> {
220 start_server_internal(path, addr, shutdown_rx, None, Some(api_key)).await
221}
222
223pub async fn start_server_secure_with_api_key<P: AsRef<Path> + Send + 'static>(
225 path: P,
226 addr: &str,
227 shutdown_rx: oneshot::Receiver<()>,
228 cert_pem: Vec<u8>,
229 key_pem: Vec<u8>,
230 api_key: String,
231) -> Result<(), Box<dyn std::error::Error>> {
232 let tls = ServerTlsOptions { cert_pem, key_pem };
233 start_server_internal(path, addr, shutdown_rx, Some(tls), Some(api_key)).await
234}
235
236pub async fn start_server_secure_from_files_with_api_key<
238 P: AsRef<Path> + Send + 'static,
239 CP: AsRef<Path>,
240 KP: AsRef<Path>,
241>(
242 path: P,
243 addr: &str,
244 shutdown_rx: oneshot::Receiver<()>,
245 cert_path: CP,
246 key_path: KP,
247 api_key: String,
248) -> Result<(), Box<dyn std::error::Error>> {
249 let cert_path_ref = cert_path.as_ref();
250 let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
251 format!(
252 "failed to open TLS certificate file '{}': {}",
253 cert_path_ref.display(),
254 e
255 )
256 .into()
257 })?;
258
259 let key_path_ref = key_path.as_ref();
260 let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
261 format!(
262 "failed to open TLS key file '{}': {}",
263 key_path_ref.display(),
264 e
265 )
266 .into()
267 })?;
268 start_server_secure_with_api_key(path, addr, shutdown_rx, cert_pem, key_pem, api_key).await
269}
270
271async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
272 path: P,
273 addr: &str,
274 shutdown_rx: oneshot::Receiver<()>,
275 tls: Option<ServerTlsOptions>,
276 api_key: Option<String>,
277) -> Result<(), Box<dyn std::error::Error>> {
278 let addr = addr.parse()?;
279 let incoming = match TcpIncoming::bind(addr) {
281 Ok(incoming) => incoming,
282 Err(err) => {
283 return Err(Box::new(DcbError::InitializationError(format!(
284 "failed to bind to address {}: {}",
285 addr, err
286 ))));
287 }
288 }
289 .with_nodelay(Some(true))
290 .with_keepalive(Some(std::time::Duration::from_secs(60)));
291
292 let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
294 let dcb_server =
295 match DcbServer::new(path.as_ref().to_owned(), srv_shutdown_rx, api_key.clone()) {
296 Ok(server) => server,
297 Err(err) => {
298 return Err(Box::new(err));
299 }
300 };
301
302 println!(
303 "UmaDB has {:?} events",
304 dcb_server.request_handler.head().await?.unwrap_or(0)
305 );
306 let tls_mode_display_str = if tls.is_some() {
307 "with TLS"
308 } else {
309 "without TLS"
310 };
311
312 let api_key_display_str = if api_key.is_some() {
313 "with API key"
314 } else {
315 "without API key"
316 };
317
318 use tonic_health::ServingStatus; let (health_reporter, health_service) = tonic_health::server::health_reporter();
321 health_reporter
323 .set_service_status("", ServingStatus::Serving)
324 .await;
325 health_reporter
326 .set_service_status("umadb.v1.DCB", ServingStatus::Serving)
327 .await;
328 let health_reporter_for_shutdown = health_reporter.clone();
329
330 let mut builder = build_server_builder_with_options(tls)
332 .layer(PathRewriterLayer)
333 .add_service(health_service);
334
335 builder = builder.add_service(dcb_server.into_service());
337 let router = builder;
338
339 println!("UmaDB is listening on {addr} ({tls_mode_display_str}, {api_key_display_str})");
340 println!("UmaDB started in {:?}", uptime());
341 router
343 .serve_with_incoming_shutdown(incoming, async move {
344 let _ = shutdown_rx.await;
346 let _ = health_reporter_for_shutdown
348 .set_service_status("", ServingStatus::NotServing)
349 .await;
350 let _ = health_reporter_for_shutdown
351 .set_service_status("umadb.v1.DCB", ServingStatus::NotServing)
352 .await;
353 let _ = srv_shutdown_tx.send(true);
355 println!("UmaDB server shutdown complete");
356 })
357 .await?;
358
359 Ok(())
360}
361
362pub struct DcbServer {
364 pub(crate) request_handler: RequestHandler,
365 shutdown_watch_rx: watch::Receiver<bool>,
366 api_key: Option<String>,
367}
368
369impl DcbServer {
370 pub fn new<P: AsRef<Path> + Send + 'static>(
371 path: P,
372 shutdown_rx: watch::Receiver<bool>,
373 api_key: Option<String>,
374 ) -> DcbResult<Self> {
375 let command_handler = RequestHandler::new(path)?;
376 Ok(Self {
377 request_handler: command_handler,
378 shutdown_watch_rx: shutdown_rx,
379 api_key,
380 })
381 }
382
383 pub fn into_service(self) -> umadb_proto::v1::dcb_server::DcbServer<Self> {
384 umadb_proto::v1::dcb_server::DcbServer::new(self)
385 }
386
387 fn enforce_api_key(&self, metadata: &tonic::metadata::MetadataMap) -> Result<(), Status> {
388 if let Some(expected) = &self.api_key {
389 let auth = metadata.get("authorization");
390 let expected_val = format!("Bearer {}", expected);
391 let ok = auth
392 .and_then(|m| m.to_str().ok())
393 .map(|s| s == expected_val)
394 .unwrap_or(false);
395 if !ok {
396 return Err(status_from_dcb_error(DcbError::AuthenticationError(
397 "missing or invalid API key".to_string(),
398 )));
399 }
400 }
401 Ok(())
402 }
403}
404
405#[tonic::async_trait]
406impl umadb_proto::v1::dcb_server::Dcb for DcbServer {
407 type ReadStream =
408 Pin<Box<dyn Stream<Item = Result<umadb_proto::v1::ReadResponse, Status>> + Send + 'static>>;
409 type SubscribeStream = Pin<
410 Box<dyn Stream<Item = Result<umadb_proto::v1::SubscribeResponse, Status>> + Send + 'static>,
411 >;
412
413 async fn read(
414 &self,
415 request: Request<umadb_proto::v1::ReadRequest>,
416 ) -> Result<Response<Self::ReadStream>, Status> {
417 self.enforce_api_key(request.metadata())?;
419 let read_request = request.into_inner();
420
421 let mut query: Option<DcbQuery> = read_request.query.map(|q| q.into());
423 let start = read_request.start;
424 let backwards = read_request.backwards.unwrap_or(false);
425 let limit = read_request.limit;
426 let batch_size = read_request
428 .batch_size
429 .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
430 .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
431 let subscribe = read_request.subscribe.unwrap_or(false);
432
433 let (tx, rx) = mpsc::channel(2048);
435 let request_handler = self.request_handler.clone();
437 let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
439
440 tokio::spawn(async move {
442 let query_clone = query.take();
444 let mut next_start = start;
445 let mut sent_any = false;
446 let mut remaining_limit = limit.unwrap_or(u32::MAX);
447 let mut head_rx = request_handler.watch_head();
450 let captured_head = if !subscribe {
452 request_handler.head().await.unwrap_or(None)
453 } else {
454 None
455 };
456 loop {
457 if subscribe {
460 if tx.is_closed() {
461 break;
462 }
463 if *shutdown_watch_rx.borrow() {
464 break;
465 }
466 }
467 let read_limit = remaining_limit.min(batch_size);
469 if subscribe && limit.is_some() && remaining_limit == 0 {
471 break;
472 }
473 match request_handler
474 .read(query_clone.clone(), next_start, backwards, Some(read_limit))
475 .await
476 {
477 Ok((dcb_sequenced_events, head)) => {
478 let original_len = dcb_sequenced_events.len();
480
481 let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
483 dcb_sequenced_events
484 .into_iter()
485 .filter(|e| {
486 if let Some(h) = captured_head {
487 e.position <= h
488 } else {
489 true
490 }
491 })
492 .map(umadb_proto::v1::SequencedEvent::from)
493 .collect();
494
495 let reached_captured_head = if captured_head.is_some() {
496 sequenced_event_protos.len() < original_len
498 } else {
499 false
500 };
501
502 let last_event_position = sequenced_event_protos.last().map(|e| e.position);
507 let head_to_send = if subscribe {
508 head
509 } else if limit.is_none() {
510 captured_head
511 } else {
512 last_event_position.or(head)
513 };
514
515 if sequenced_event_protos.is_empty() {
516 if !sent_any {
518 let response = umadb_proto::v1::ReadResponse {
519 events: vec![],
520 head: head_to_send,
521 };
522 let _ = tx.send(Ok(response)).await;
523 }
524 if subscribe {
526 tokio::select! {
528 _ = head_rx.changed() => {},
529 _ = shutdown_watch_rx.changed() => {},
530 _ = tx.closed() => {},
531 }
532 continue;
533 }
534 break;
535 }
536
537 let sent_count = sequenced_event_protos.len() as u32;
539
540 let response = umadb_proto::v1::ReadResponse {
541 events: sequenced_event_protos,
542 head: head_to_send,
543 };
544
545 if tx.send(Ok(response)).await.is_err() {
546 break;
547 }
548 sent_any = true;
549
550 next_start =
552 last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
553
554 if reached_captured_head && !subscribe {
557 break;
558 }
559
560 if limit.is_some() {
562 if remaining_limit <= sent_count {
563 remaining_limit = 0;
564 } else {
565 remaining_limit -= sent_count;
566 }
567 if remaining_limit == 0 {
568 break;
569 }
570 }
571
572 tokio::task::yield_now().await;
574 }
575 Err(e) => {
576 let _ = tx.send(Err(status_from_dcb_error(e))).await;
577 break;
578 }
579 }
580 }
581 });
582
583 Ok(Response::new(
585 Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
586 ))
587 }
588
589 async fn subscribe(
590 &self,
591 request: Request<umadb_proto::v1::SubscribeRequest>,
592 ) -> Result<Response<Self::SubscribeStream>, Status> {
593 self.enforce_api_key(request.metadata())?;
595 let subscribe_request = request.into_inner();
596
597 let mut query: Option<DcbQuery> = subscribe_request.query.map(|q| q.into());
599 let after = subscribe_request.after;
600 let batch_size = subscribe_request
602 .batch_size
603 .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
604 .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
605
606 let (tx, rx) = mpsc::channel(2048);
608 let request_handler = self.request_handler.clone();
610 let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
612
613 tokio::spawn(async move {
615 let query_clone = query.take();
617 let mut next_after = after.map(|a| a.saturating_add(1));
619 let mut head_rx = request_handler.watch_head();
621
622 loop {
623 if tx.is_closed() {
625 break;
626 }
627 if *shutdown_watch_rx.borrow() {
628 break;
629 }
630
631 match request_handler
633 .read(query_clone.clone(), next_after, false, Some(batch_size))
634 .await
635 {
636 Ok((dcb_sequenced_events, _head)) => {
637 let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
639 dcb_sequenced_events
640 .into_iter()
641 .map(umadb_proto::v1::SequencedEvent::from)
642 .collect();
643
644 if sequenced_event_protos.is_empty() {
645 tokio::select! {
647 _ = head_rx.changed() => {},
648 _ = shutdown_watch_rx.changed() => {},
649 _ = tx.closed() => {},
650 }
651 continue;
652 }
653
654 let last_event_position = sequenced_event_protos.last().map(|e| e.position);
655
656 let response = umadb_proto::v1::SubscribeResponse {
657 events: sequenced_event_protos,
658 };
659
660 if tx.send(Ok(response)).await.is_err() {
661 break;
662 }
663
664 next_after = last_event_position.map(|p| p.saturating_add(1));
667
668 tokio::task::yield_now().await;
670 }
671 Err(e) => {
672 let _ = tx.send(Err(status_from_dcb_error(e))).await;
673 break;
674 }
675 }
676 }
677 });
678
679 Ok(Response::new(
681 Box::pin(ReceiverStream::new(rx)) as Self::SubscribeStream
682 ))
683 }
684
685 async fn append(
686 &self,
687 request: Request<umadb_proto::v1::AppendRequest>,
688 ) -> Result<Response<umadb_proto::v1::AppendResponse>, Status> {
689 self.enforce_api_key(request.metadata())?;
691 let req = request.into_inner();
692
693 let events: Vec<DcbEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
695 Ok(events) => events,
696 Err(e) => {
697 return Err(status_from_dcb_error(e));
698 }
699 };
700 let condition = req.condition.map(|c| c.into());
701
702 match self
704 .request_handler
705 .append(
706 events,
707 condition,
708 req.tracking_info.map(|t| TrackingInfo {
709 source: t.source,
710 position: t.position,
711 }),
712 )
713 .await
714 {
715 Ok(position) => Ok(Response::new(umadb_proto::v1::AppendResponse { position })),
716 Err(e) => Err(status_from_dcb_error(e)),
717 }
718 }
719
720 async fn head(
721 &self,
722 request: Request<umadb_proto::v1::HeadRequest>,
723 ) -> Result<Response<umadb_proto::v1::HeadResponse>, Status> {
724 self.enforce_api_key(request.metadata())?;
726 match self.request_handler.head().await {
728 Ok(position) => {
729 Ok(Response::new(umadb_proto::v1::HeadResponse { position }))
731 }
732 Err(e) => Err(status_from_dcb_error(e)),
733 }
734 }
735
736 async fn get_tracking_info(
737 &self,
738 request: Request<umadb_proto::v1::TrackingRequest>,
739 ) -> Result<Response<umadb_proto::v1::TrackingResponse>, Status> {
740 self.enforce_api_key(request.metadata())?;
742 let req = request.into_inner();
743 match self.request_handler.get_tracking_info(req.source).await {
744 Ok(position) => Ok(Response::new(umadb_proto::v1::TrackingResponse {
745 position,
746 })),
747 Err(e) => Err(status_from_dcb_error(e)),
748 }
749 }
750}
751
752enum WriterRequest {
754 Append {
755 events: Vec<DcbEvent>,
756 condition: Option<DcbAppendCondition>,
757 tracking_info: Option<TrackingInfo>,
758 response_tx: oneshot::Sender<DcbResult<u64>>,
759 },
760 Shutdown,
761}
762
763struct RequestHandler {
765 mvcc: Arc<Mvcc>,
766 head_watch_tx: watch::Sender<Option<u64>>,
767 writer_request_tx: mpsc::Sender<WriterRequest>,
768}
769
770impl RequestHandler {
771 fn new<P: AsRef<Path> + Send + 'static>(path: P) -> DcbResult<Self> {
772 let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
774
775 let p = path.as_ref();
777 let file_path = if p.is_dir() {
778 p.join(DEFAULT_DB_FILENAME)
779 } else {
780 p.to_path_buf()
781 };
782 let mvcc = Arc::new(Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)?);
783
784 let init_head = {
786 let (_, header) = mvcc.get_latest_header()?;
787 let last = header.next_position.0.saturating_sub(1);
788 if last == 0 { None } else { Some(last) }
789 };
790 let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
791
792 let mvcc_for_writer = mvcc.clone();
794 let head_tx_writer = head_tx.clone();
795 thread::spawn(move || {
796 let db = UmaDb::from_arc(mvcc_for_writer);
797
798 let rt = Runtime::new().unwrap();
800
801 rt.block_on(async {
803 while let Some(request) = request_rx.recv().await {
804 match request {
805 WriterRequest::Append {
806 events,
807 condition,
808 tracking_info,
809 response_tx,
810 } => {
811 let mut total_events = 0;
816 total_events += events.len();
817 let mvcc = &db.mvcc;
820 let mut writer = match mvcc.writer() {
821 Ok(writer) => writer,
822 Err(err) => {
823 let _ = response_tx.send(Err(err));
824 continue;
825 }
826 };
827
828 let mut responders: Vec<oneshot::Sender<DcbResult<u64>>> = Vec::new();
829 let mut results: Vec<DcbResult<u64>> = Vec::new();
830
831 let mut abort_idx: Option<usize> = None;
833 let mut abort_err: Option<DcbError> = None;
834
835 responders.push(response_tx);
836 let result = UmaDb::process_append_request(
837 events,
838 condition,
839 tracking_info,
840 mvcc,
841 &mut writer,
842 );
843 match &result {
845 Ok(_) => results.push(result),
846 Err(e) if is_integrity_error(e) => {
847 results.push(Err(clone_dcb_error(e)))
848 }
849 Err(e) => {
850 abort_idx = Some(0);
851 abort_err = Some(clone_dcb_error(e));
852 results.push(Err(clone_dcb_error(e)));
853 }
854 }
855
856 loop {
860 if total_events >= APPEND_BATCH_MAX_EVENTS {
861 break;
862 }
863 if abort_idx.is_some() {
865 break;
866 }
867 match request_rx.try_recv() {
868 Ok(WriterRequest::Append {
869 events,
870 condition,
871 tracking_info,
872 response_tx,
873 }) => {
874 let ev_len = events.len();
875 let idx_in_batch = responders.len();
876 responders.push(response_tx);
877 let res_next = UmaDb::process_append_request(
878 events,
879 condition,
880 tracking_info,
881 mvcc,
882 &mut writer,
883 );
884 match &res_next {
885 Ok(_) => results.push(res_next),
886 Err(e) if is_integrity_error(e) => {
887 results.push(Err(clone_dcb_error(e)))
888 }
889 Err(e) => {
890 abort_idx = Some(idx_in_batch);
891 abort_err = Some(clone_dcb_error(e));
892 results.push(Err(clone_dcb_error(e)));
893 }
895 }
896 total_events += ev_len;
897 }
898 Ok(WriterRequest::Shutdown) => {
899 break;
904 }
905 Err(mpsc::error::TryRecvError::Empty) => {
906 break;
907 }
908 Err(mpsc::error::TryRecvError::Disconnected) => break,
909 }
910 }
911 if let (Some(failed_at), Some(orig_err)) = (abort_idx, abort_err) {
914 let shadow = shadow_for_batch_abort(&orig_err);
916 for (i, tx) in responders.into_iter().enumerate() {
917 if i == failed_at {
918 let _ = tx.send(Err(clone_dcb_error(&orig_err)));
919 } else {
920 let _ = tx.send(Err(clone_dcb_error(&shadow)));
921 }
922 }
923 continue;
925 }
926
927 let batch_result = match mvcc.commit(&mut writer) {
929 Ok(_) => Ok(results),
930 Err(err) => Err(err),
931 };
932
933 match batch_result {
934 Ok(results) => {
935 for (res, tx) in results.into_iter().zip(responders.into_iter())
937 {
938 let _ = tx.send(res);
939 }
940 let last_committed = writer.next_position.0.saturating_sub(1);
942 let new_head = if last_committed == 0 {
943 None
944 } else {
945 Some(last_committed)
946 };
947 let _ = head_tx_writer.send(new_head);
948 }
949 Err(e) => {
950 let total = responders.len();
954 let mut iter = responders.into_iter();
955 for _ in 0..total {
956 if let Some(tx) = iter.next() {
957 let _ = tx.send(Err(clone_dcb_error(&e)));
958 }
959 }
960 }
961 }
962 }
963 WriterRequest::Shutdown => {
964 break;
965 }
966 }
967 }
968 });
969 });
970
971 Ok(Self {
972 mvcc,
973 head_watch_tx: head_tx,
974 writer_request_tx: request_tx,
975 })
976 }
977
978 async fn read(
979 &self,
980 query: Option<DcbQuery>,
981 start: Option<u64>,
982 backwards: bool,
983 limit: Option<u32>,
984 ) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
985 let reader = self.mvcc.reader()?;
986 let last_committed_position = reader.next_position.0.saturating_sub(1);
987
988 let q = query.unwrap_or(DcbQuery { items: vec![] });
989 let start_position = start.map(Position);
990
991 let events = read_conditional(
992 &self.mvcc,
993 &std::collections::HashMap::new(),
994 reader.events_tree_root_id,
995 reader.tags_tree_root_id,
996 q,
997 start_position,
998 backwards,
999 limit,
1000 false,
1001 )
1002 .map_err(|e| DcbError::Corruption(format!("{e}")))?;
1003
1004 let head = if limit.is_none() {
1005 if last_committed_position == 0 {
1006 None
1007 } else {
1008 Some(last_committed_position)
1009 }
1010 } else {
1011 events.last().map(|e| e.position)
1012 };
1013
1014 Ok((events, head))
1015 }
1016
1017 async fn head(&self) -> DcbResult<Option<u64>> {
1018 let (_, header) = self
1019 .mvcc
1020 .get_latest_header()
1021 .map_err(|e| DcbError::Corruption(format!("{e}")))?;
1022 let last = header.next_position.0.saturating_sub(1);
1023 if last == 0 { Ok(None) } else { Ok(Some(last)) }
1024 }
1025
1026 async fn get_tracking_info(&self, source: String) -> DcbResult<Option<u64>> {
1027 let db = UmaDb::from_arc(self.mvcc.clone());
1028 db.get_tracking_info(&source)
1029 }
1030
1031 pub async fn append(
1032 &self,
1033 events: Vec<DcbEvent>,
1034 condition: Option<DcbAppendCondition>,
1035 tracking_info: Option<TrackingInfo>,
1036 ) -> DcbResult<u64> {
1037 let pre_append_decision = if let Some(mut given_condition) = condition {
1039 let reader = self.mvcc.reader()?;
1040 let current_head = {
1041 let last = reader.next_position.0.saturating_sub(1);
1042 if last == 0 { None } else { Some(last) }
1043 };
1044
1045 let from = given_condition.after.map(|after| Position(after + 1));
1047 let empty_dirty = std::collections::HashMap::new();
1048 let found = read_conditional(
1049 &self.mvcc,
1050 &empty_dirty,
1051 reader.events_tree_root_id,
1052 reader.tags_tree_root_id,
1053 given_condition.fail_if_events_match.clone(),
1054 from,
1055 false,
1056 Some(1),
1057 false,
1058 )?;
1059
1060 if let Some(matched) = found.first() {
1061 match is_request_idempotent(
1063 &self.mvcc,
1064 &empty_dirty,
1065 reader.events_tree_root_id,
1066 reader.tags_tree_root_id,
1067 &events,
1068 given_condition.fail_if_events_match.clone(),
1069 from,
1070 ) {
1071 Ok(Some(last_recorded_position)) => {
1072 PreAppendDecision::AlreadyAppended(last_recorded_position)
1074 }
1075 Ok(None) => {
1076 let msg = format!(
1078 "condition: {:?} matched: {:?}",
1079 given_condition.clone(),
1080 matched,
1081 );
1082 return Err(DcbError::IntegrityError(msg));
1083 }
1084 Err(err) => {
1085 return Err(err);
1087 }
1088 }
1089 } else {
1090 let new_after = std::cmp::max(
1092 given_condition.after.unwrap_or(0),
1093 current_head.unwrap_or(0),
1094 );
1095 given_condition.after = Some(new_after);
1096
1097 PreAppendDecision::UseCondition(Some(given_condition))
1098 }
1099 } else {
1100 PreAppendDecision::UseCondition(None)
1102 };
1103
1104 match pre_append_decision {
1106 PreAppendDecision::AlreadyAppended(last_found_position) => {
1107 Ok(last_found_position)
1109 }
1110 PreAppendDecision::UseCondition(adjusted_condition) => {
1111 let (response_tx, response_rx) = oneshot::channel();
1113
1114 self.writer_request_tx
1115 .send(WriterRequest::Append {
1116 events,
1117 condition: adjusted_condition,
1118 tracking_info,
1119 response_tx,
1120 })
1121 .await
1122 .map_err(|_| {
1123 DcbError::Io(std::io::Error::other(
1124 "failed to send append request to EventStore thread",
1125 ))
1126 })?;
1127
1128 response_rx.await.map_err(|_| {
1129 DcbError::Io(std::io::Error::other(
1130 "failed to receive append response from EventStore thread",
1131 ))
1132 })?
1133 }
1134 }
1135 }
1136
1137 fn watch_head(&self) -> watch::Receiver<Option<u64>> {
1138 self.head_watch_tx.subscribe()
1139 }
1140
1141 #[allow(dead_code)]
1142 async fn shutdown(&self) {
1143 let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
1144 }
1145}
1146
1147impl Clone for RequestHandler {
1149 fn clone(&self) -> Self {
1150 Self {
1151 mvcc: self.mvcc.clone(),
1152 head_watch_tx: self.head_watch_tx.clone(),
1153 writer_request_tx: self.writer_request_tx.clone(),
1154 }
1155 }
1156}
1157
1158#[derive(Debug)]
1159enum PreAppendDecision {
1160 UseCondition(Option<DcbAppendCondition>),
1162 AlreadyAppended(u64),
1164}