1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::LazyLock;
7use std::thread;
8use std::time::Instant;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::{Identity, ServerTlsConfig};
12use tonic::{Request, Response, Status, transport::Server};
13use umadb_core::db::{
14 DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDB, clone_dcb_error, is_integrity_error,
15 is_request_idempotent, read_conditional, shadow_for_batch_abort,
16};
17use umadb_core::mvcc::Mvcc;
18use umadb_dcb::{
19 DCBAppendCondition, DCBError, DCBEvent, DCBQuery, DCBResult, DCBSequencedEvent, TrackingInfo,
20};
21
22use tokio::runtime::Runtime;
23use tonic::codegen::http;
24use tonic::transport::server::TcpIncoming;
25use umadb_core::common::Position;
26
27use std::convert::Infallible;
28use std::future::Future;
29use std::task::{Context, Poll};
30use tonic::server::NamedService;
31use umadb_proto::status_from_dcb_error;
32
33#[derive(Clone, Debug)]
35pub struct PathRewriterService<S> {
36 inner: S,
37}
38
39impl<S> tower::Service<http::Request<tonic::body::Body>> for PathRewriterService<S>
40where
41 S: tower::Service<
42 http::Request<tonic::body::Body>,
43 Response = http::Response<tonic::body::Body>,
44 Error = Infallible,
45 > + Clone
46 + Send
47 + 'static,
48 S::Future: Send + 'static,
49{
50 type Response = S::Response;
51 type Error = S::Error;
52 type Future =
53 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
54
55 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
56 self.inner.poll_ready(cx)
57 }
58
59 fn call(&mut self, mut req: http::Request<tonic::body::Body>) -> Self::Future {
60 let uri = req.uri().clone();
61 let path = uri.path();
62
63 if path.starts_with("/umadb.UmaDBService/") {
65 let new_path_str = path.replace("/umadb.UmaDBService/", "/umadb.v1.DCB/");
66
67 let new_uri = if let (Some(scheme), Some(authority)) = (uri.scheme(), uri.authority()) {
70 http::Uri::builder()
72 .scheme(scheme.clone())
73 .authority(authority.clone())
74 .path_and_query(new_path_str.as_str())
75 .build()
76 .ok() } else {
78 new_path_str.parse::<http::Uri>().ok()
81 };
82
83 if let Some(final_uri) = new_uri {
84 *req.uri_mut() = final_uri;
85 } else {
86 eprintln!("Failed to construct valid URI for path: {}", path);
87 }
88 }
89
90 let fut = self.inner.call(req);
91 Box::pin(fut)
92 }
93}
94
95impl<S: NamedService> NamedService for PathRewriterService<S> {
97 const NAME: &'static str = S::NAME;
98}
99
100#[derive(Clone, Debug)]
101pub struct PathRewriterLayer;
102
103impl<S> tower::Layer<S> for PathRewriterLayer
104where
105 S: tower::Service<
106 http::Request<tonic::body::Body>,
107 Response = http::Response<tonic::body::Body>,
108 Error = Infallible,
109 > + Clone
110 + Send
111 + 'static,
112 S::Future: Send + 'static,
113{
114 type Service = PathRewriterService<S>;
115
116 fn layer(&self, inner: S) -> Self::Service {
117 PathRewriterService { inner }
118 }
119}
120
121static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
122
123const APPEND_BATCH_MAX_EVENTS: usize = 2000;
124const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
125const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
126
127#[derive(Clone, Debug)]
129pub struct ServerTlsOptions {
130 pub cert_pem: Vec<u8>,
131 pub key_pem: Vec<u8>,
132}
133
134pub fn uptime() -> std::time::Duration {
135 START_TIME.elapsed()
136}
137
138fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
139 use std::time::Duration;
140 let mut server_builder = Server::builder()
141 .http2_keepalive_interval(Some(Duration::from_secs(5)))
142 .http2_keepalive_timeout(Some(Duration::from_secs(10)))
143 .initial_stream_window_size(Some(4 * 1024 * 1024))
144 .initial_connection_window_size(Some(8 * 1024 * 1024))
145 .tcp_nodelay(true)
146 .concurrency_limit_per_connection(1024);
147
148 if let Some(opts) = tls {
149 let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
150 server_builder = server_builder
151 .tls_config(ServerTlsConfig::new().identity(identity))
152 .expect("failed to apply TLS config");
153 }
154
155 server_builder
156}
157
158pub async fn start_server<P: AsRef<Path> + Send + 'static>(
160 path: P,
161 addr: &str,
162 shutdown_rx: oneshot::Receiver<()>,
163) -> Result<(), Box<dyn std::error::Error>> {
164 start_server_internal(path, addr, shutdown_rx, None, None).await
165}
166
167pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
169 path: P,
170 addr: &str,
171 shutdown_rx: oneshot::Receiver<()>,
172 cert_pem: Vec<u8>,
173 key_pem: Vec<u8>,
174) -> Result<(), Box<dyn std::error::Error>> {
175 let tls = ServerTlsOptions { cert_pem, key_pem };
176 start_server_internal(path, addr, shutdown_rx, Some(tls), None).await
177}
178
179pub async fn start_server_secure_from_files<
181 P: AsRef<Path> + Send + 'static,
182 CP: AsRef<Path>,
183 KP: AsRef<Path>,
184>(
185 path: P,
186 addr: &str,
187 shutdown_rx: oneshot::Receiver<()>,
188 cert_path: CP,
189 key_path: KP,
190) -> Result<(), Box<dyn std::error::Error>> {
191 let cert_path_ref = cert_path.as_ref();
192 let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
193 format!(
194 "Failed to open TLS certificate file '{}': {}",
195 cert_path_ref.display(),
196 e
197 )
198 .into()
199 })?;
200
201 let key_path_ref = key_path.as_ref();
202 let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
203 format!(
204 "Failed to open TLS key file '{}': {}",
205 key_path_ref.display(),
206 e
207 )
208 .into()
209 })?;
210 start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
211}
212
213pub async fn start_server_with_api_key<P: AsRef<Path> + Send + 'static>(
215 path: P,
216 addr: &str,
217 shutdown_rx: oneshot::Receiver<()>,
218 api_key: String,
219) -> Result<(), Box<dyn std::error::Error>> {
220 start_server_internal(path, addr, shutdown_rx, None, Some(api_key)).await
221}
222
223pub async fn start_server_secure_with_api_key<P: AsRef<Path> + Send + 'static>(
225 path: P,
226 addr: &str,
227 shutdown_rx: oneshot::Receiver<()>,
228 cert_pem: Vec<u8>,
229 key_pem: Vec<u8>,
230 api_key: String,
231) -> Result<(), Box<dyn std::error::Error>> {
232 let tls = ServerTlsOptions { cert_pem, key_pem };
233 start_server_internal(path, addr, shutdown_rx, Some(tls), Some(api_key)).await
234}
235
236pub async fn start_server_secure_from_files_with_api_key<
238 P: AsRef<Path> + Send + 'static,
239 CP: AsRef<Path>,
240 KP: AsRef<Path>,
241>(
242 path: P,
243 addr: &str,
244 shutdown_rx: oneshot::Receiver<()>,
245 cert_path: CP,
246 key_path: KP,
247 api_key: String,
248) -> Result<(), Box<dyn std::error::Error>> {
249 let cert_path_ref = cert_path.as_ref();
250 let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
251 format!(
252 "Failed to open TLS certificate file '{}': {}",
253 cert_path_ref.display(),
254 e
255 )
256 .into()
257 })?;
258
259 let key_path_ref = key_path.as_ref();
260 let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
261 format!(
262 "Failed to open TLS key file '{}': {}",
263 key_path_ref.display(),
264 e
265 )
266 .into()
267 })?;
268 start_server_secure_with_api_key(path, addr, shutdown_rx, cert_pem, key_pem, api_key).await
269}
270
271async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
272 path: P,
273 addr: &str,
274 shutdown_rx: oneshot::Receiver<()>,
275 tls: Option<ServerTlsOptions>,
276 api_key: Option<String>,
277) -> Result<(), Box<dyn std::error::Error>> {
278 let addr = addr.parse()?;
279 let incoming = match TcpIncoming::bind(addr) {
281 Ok(incoming) => incoming,
282 Err(err) => {
283 return Err(Box::new(DCBError::InitializationError(format!(
284 "Failed to bind to address {}: {}",
285 addr, err
286 ))));
287 }
288 }
289 .with_nodelay(Some(true))
290 .with_keepalive(Some(std::time::Duration::from_secs(60)));
291
292 let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
294 let dcb_server =
295 match DCBServer::new(path.as_ref().to_owned(), srv_shutdown_rx, api_key.clone()) {
296 Ok(server) => server,
297 Err(err) => {
298 return Err(Box::new(err));
299 }
300 };
301
302 println!(
303 "UmaDB has {:?} events",
304 dcb_server.request_handler.head().await?.unwrap_or(0)
305 );
306 let tls_mode_display_str = if tls.is_some() {
307 "with TLS"
308 } else {
309 "without TLS"
310 };
311
312 let api_key_display_str = if api_key.is_some() {
313 "with API key"
314 } else {
315 "without API key"
316 };
317
318 use tonic_health::ServingStatus; let (health_reporter, health_service) = tonic_health::server::health_reporter();
321 health_reporter
323 .set_service_status("", ServingStatus::Serving)
324 .await;
325 health_reporter
326 .set_service_status("umadb.v1.DCB", ServingStatus::Serving)
327 .await;
328 let health_reporter_for_shutdown = health_reporter.clone();
329
330 let mut builder = build_server_builder_with_options(tls)
332 .layer(PathRewriterLayer)
333 .add_service(health_service);
334
335 builder = builder.add_service(dcb_server.into_service());
337 let router = builder;
338
339 println!("UmaDB is listening on {addr} ({tls_mode_display_str}, {api_key_display_str})");
340 println!("UmaDB started in {:?}", uptime());
341 router
343 .serve_with_incoming_shutdown(incoming, async move {
344 let _ = shutdown_rx.await;
346 let _ = health_reporter_for_shutdown
348 .set_service_status("", ServingStatus::NotServing)
349 .await;
350 let _ = health_reporter_for_shutdown
351 .set_service_status("umadb.v1.DCB", ServingStatus::NotServing)
352 .await;
353 let _ = srv_shutdown_tx.send(true);
355 println!("UmaDB server shutdown complete");
356 })
357 .await?;
358
359 Ok(())
360}
361
362pub struct DCBServer {
364 pub(crate) request_handler: RequestHandler,
365 shutdown_watch_rx: watch::Receiver<bool>,
366 api_key: Option<String>,
367}
368
369impl DCBServer {
370 pub fn new<P: AsRef<Path> + Send + 'static>(
371 path: P,
372 shutdown_rx: watch::Receiver<bool>,
373 api_key: Option<String>,
374 ) -> DCBResult<Self> {
375 let command_handler = RequestHandler::new(path)?;
376 Ok(Self {
377 request_handler: command_handler,
378 shutdown_watch_rx: shutdown_rx,
379 api_key,
380 })
381 }
382
383 pub fn into_service(self) -> umadb_proto::v1::dcb_server::DcbServer<Self> {
384 umadb_proto::v1::dcb_server::DcbServer::new(self)
385 }
386}
387
388#[tonic::async_trait]
389impl umadb_proto::v1::dcb_server::Dcb for DCBServer {
390 type ReadStream =
391 Pin<Box<dyn Stream<Item = Result<umadb_proto::v1::ReadResponse, Status>> + Send + 'static>>;
392
393 async fn read(
394 &self,
395 request: Request<umadb_proto::v1::ReadRequest>,
396 ) -> Result<Response<Self::ReadStream>, Status> {
397 if let Some(expected) = &self.api_key {
399 let auth = request.metadata().get("authorization");
400 let expected_val = format!("Bearer {}", expected);
401 let ok = auth
402 .and_then(|m| m.to_str().ok())
403 .map(|s| s == expected_val)
404 .unwrap_or(false);
405 if !ok {
406 return Err(status_from_dcb_error(DCBError::AuthenticationError(
407 "missing or invalid API key".to_string(),
408 )));
409 }
410 }
411 let read_request = request.into_inner();
412
413 let mut query: Option<DCBQuery> = read_request.query.map(|q| q.into());
415 let start = read_request.start;
416 let backwards = read_request.backwards.unwrap_or(false);
417 let limit = read_request.limit;
418 let batch_size = read_request
420 .batch_size
421 .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
422 .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
423 let subscribe = read_request.subscribe.unwrap_or(false);
424
425 let (tx, rx) = mpsc::channel(2048);
427 let request_handler = self.request_handler.clone();
429 let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
431
432 tokio::spawn(async move {
434 let query_clone = query.take();
436 let mut next_start = start;
437 let mut sent_any = false;
438 let mut remaining_limit = limit.unwrap_or(u32::MAX);
439 let mut head_rx = request_handler.watch_head();
442 let captured_head = if !subscribe {
444 request_handler.head().await.unwrap_or(None)
445 } else {
446 None
447 };
448 loop {
449 if subscribe {
452 if tx.is_closed() {
453 break;
454 }
455 if *shutdown_watch_rx.borrow() {
456 break;
457 }
458 }
459 let read_limit = remaining_limit.min(batch_size);
461 if subscribe && limit.is_some() && remaining_limit == 0 {
463 break;
464 }
465 match request_handler
466 .read(query_clone.clone(), next_start, backwards, Some(read_limit))
467 .await
468 {
469 Ok((dcb_sequenced_events, head)) => {
470 let original_len = dcb_sequenced_events.len();
472
473 let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
475 dcb_sequenced_events
476 .into_iter()
477 .filter(|e| {
478 if let Some(h) = captured_head {
479 e.position <= h
480 } else {
481 true
482 }
483 })
484 .map(umadb_proto::v1::SequencedEvent::from)
485 .collect();
486
487 let reached_captured_head = if captured_head.is_some() {
488 sequenced_event_protos.len() < original_len
490 } else {
491 false
492 };
493
494 let last_event_position = sequenced_event_protos.last().map(|e| e.position);
499 let head_to_send = if subscribe {
500 head
501 } else if limit.is_none() {
502 captured_head
503 } else {
504 last_event_position.or(head)
505 };
506
507 if sequenced_event_protos.is_empty() {
508 if !sent_any {
510 let response = umadb_proto::v1::ReadResponse {
511 events: vec![],
512 head: head_to_send,
513 };
514 let _ = tx.send(Ok(response)).await;
515 }
516 if subscribe {
518 tokio::select! {
520 _ = head_rx.changed() => {},
521 _ = shutdown_watch_rx.changed() => {},
522 _ = tx.closed() => {},
523 }
524 continue;
525 }
526 break;
527 }
528
529 let sent_count = sequenced_event_protos.len() as u32;
531
532 let response = umadb_proto::v1::ReadResponse {
533 events: sequenced_event_protos,
534 head: head_to_send,
535 };
536
537 if tx.send(Ok(response)).await.is_err() {
538 break;
539 }
540 sent_any = true;
541
542 next_start =
544 last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
545
546 if reached_captured_head && !subscribe {
549 break;
550 }
551
552 if limit.is_some() {
554 if remaining_limit <= sent_count {
555 remaining_limit = 0;
556 } else {
557 remaining_limit -= sent_count;
558 }
559 if remaining_limit == 0 {
560 break;
561 }
562 }
563
564 tokio::task::yield_now().await;
566 }
567 Err(e) => {
568 let _ = tx.send(Err(status_from_dcb_error(e))).await;
569 break;
570 }
571 }
572 }
573 });
574
575 Ok(Response::new(
577 Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
578 ))
579 }
580
581 async fn append(
582 &self,
583 request: Request<umadb_proto::v1::AppendRequest>,
584 ) -> Result<Response<umadb_proto::v1::AppendResponse>, Status> {
585 if let Some(expected) = &self.api_key {
587 let auth = request.metadata().get("authorization");
588 let expected_val = format!("Bearer {}", expected);
589 let ok = auth
590 .and_then(|m| m.to_str().ok())
591 .map(|s| s == expected_val)
592 .unwrap_or(false);
593 if !ok {
594 return Err(status_from_dcb_error(DCBError::AuthenticationError(
595 "missing or invalid API key".to_string(),
596 )));
597 }
598 }
599 let req = request.into_inner();
600
601 let events: Vec<DCBEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
603 Ok(events) => events,
604 Err(e) => {
605 return Err(status_from_dcb_error(e));
606 }
607 };
608 let condition = req.condition.map(|c| c.into());
609
610 match self
612 .request_handler
613 .append(
614 events,
615 condition,
616 req.tracking_info.map(|t| TrackingInfo {
617 source: t.source,
618 position: t.position,
619 }),
620 )
621 .await
622 {
623 Ok(position) => Ok(Response::new(umadb_proto::v1::AppendResponse { position })),
624 Err(e) => Err(status_from_dcb_error(e)),
625 }
626 }
627
628 async fn head(
629 &self,
630 request: Request<umadb_proto::v1::HeadRequest>,
631 ) -> Result<Response<umadb_proto::v1::HeadResponse>, Status> {
632 if let Some(expected) = &self.api_key {
634 let auth = request.metadata().get("authorization");
635 let expected_val = format!("Bearer {}", expected);
636 let ok = auth
637 .and_then(|m| m.to_str().ok())
638 .map(|s| s == expected_val)
639 .unwrap_or(false);
640 if !ok {
641 return Err(status_from_dcb_error(DCBError::AuthenticationError(
642 "missing or invalid API key".to_string(),
643 )));
644 }
645 }
646 match self.request_handler.head().await {
648 Ok(position) => {
649 Ok(Response::new(umadb_proto::v1::HeadResponse { position }))
651 }
652 Err(e) => Err(status_from_dcb_error(e)),
653 }
654 }
655
656 async fn get_tracking_info(
657 &self,
658 request: Request<umadb_proto::v1::TrackingRequest>,
659 ) -> Result<Response<umadb_proto::v1::TrackingResponse>, Status> {
660 if let Some(expected) = &self.api_key {
662 let auth = request.metadata().get("authorization");
663 let expected_val = format!("Bearer {}", expected);
664 let ok = auth
665 .and_then(|m| m.to_str().ok())
666 .map(|s| s == expected_val)
667 .unwrap_or(false);
668 if !ok {
669 return Err(status_from_dcb_error(DCBError::AuthenticationError(
670 "missing or invalid API key".to_string(),
671 )));
672 }
673 }
674 let req = request.into_inner();
675 match self.request_handler.get_tracking_info(req.source).await {
676 Ok(position) => Ok(Response::new(umadb_proto::v1::TrackingResponse {
677 position,
678 })),
679 Err(e) => Err(status_from_dcb_error(e)),
680 }
681 }
682}
683
684enum WriterRequest {
686 Append {
687 events: Vec<DCBEvent>,
688 condition: Option<DCBAppendCondition>,
689 tracking_info: Option<TrackingInfo>,
690 response_tx: oneshot::Sender<DCBResult<u64>>,
691 },
692 Shutdown,
693}
694
695struct RequestHandler {
697 mvcc: Arc<Mvcc>,
698 head_watch_tx: watch::Sender<Option<u64>>,
699 writer_request_tx: mpsc::Sender<WriterRequest>,
700}
701
702impl RequestHandler {
703 fn new<P: AsRef<Path> + Send + 'static>(path: P) -> DCBResult<Self> {
704 let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
706
707 let p = path.as_ref();
709 let file_path = if p.is_dir() {
710 p.join(DEFAULT_DB_FILENAME)
711 } else {
712 p.to_path_buf()
713 };
714 let mvcc = Arc::new(Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)?);
715
716 let init_head = {
718 let (_, header) = mvcc.get_latest_header()?;
719 let last = header.next_position.0.saturating_sub(1);
720 if last == 0 { None } else { Some(last) }
721 };
722 let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
723
724 let mvcc_for_writer = mvcc.clone();
726 let head_tx_writer = head_tx.clone();
727 thread::spawn(move || {
728 let db = UmaDB::from_arc(mvcc_for_writer);
729
730 let rt = Runtime::new().unwrap();
732
733 rt.block_on(async {
735 while let Some(request) = request_rx.recv().await {
736 match request {
737 WriterRequest::Append {
738 events,
739 condition,
740 tracking_info,
741 response_tx,
742 } => {
743 let mut total_events = 0;
748 total_events += events.len();
749 let mvcc = &db.mvcc;
752 let mut writer = match mvcc.writer() {
753 Ok(writer) => writer,
754 Err(err) => {
755 let _ = response_tx.send(Err(err));
756 continue;
757 }
758 };
759
760 let mut responders: Vec<oneshot::Sender<DCBResult<u64>>> = Vec::new();
761 let mut results: Vec<DCBResult<u64>> = Vec::new();
762
763 let mut abort_idx: Option<usize> = None;
765 let mut abort_err: Option<DCBError> = None;
766
767 responders.push(response_tx);
768 let result = UmaDB::process_append_request(
769 events,
770 condition,
771 tracking_info,
772 mvcc,
773 &mut writer,
774 );
775 match &result {
777 Ok(_) => results.push(result),
778 Err(e) if is_integrity_error(e) => {
779 results.push(Err(clone_dcb_error(e)))
780 }
781 Err(e) => {
782 abort_idx = Some(0);
783 abort_err = Some(clone_dcb_error(e));
784 results.push(Err(clone_dcb_error(e)));
785 }
786 }
787
788 loop {
792 if total_events >= APPEND_BATCH_MAX_EVENTS {
793 break;
794 }
795 if abort_idx.is_some() {
797 break;
798 }
799 match request_rx.try_recv() {
800 Ok(WriterRequest::Append {
801 events,
802 condition,
803 tracking_info,
804 response_tx,
805 }) => {
806 let ev_len = events.len();
807 let idx_in_batch = responders.len();
808 responders.push(response_tx);
809 let res_next = UmaDB::process_append_request(
810 events,
811 condition,
812 tracking_info,
813 mvcc,
814 &mut writer,
815 );
816 match &res_next {
817 Ok(_) => results.push(res_next),
818 Err(e) if is_integrity_error(e) => {
819 results.push(Err(clone_dcb_error(e)))
820 }
821 Err(e) => {
822 abort_idx = Some(idx_in_batch);
823 abort_err = Some(clone_dcb_error(e));
824 results.push(Err(clone_dcb_error(e)));
825 }
827 }
828 total_events += ev_len;
829 }
830 Ok(WriterRequest::Shutdown) => {
831 break;
836 }
837 Err(mpsc::error::TryRecvError::Empty) => {
838 break;
839 }
840 Err(mpsc::error::TryRecvError::Disconnected) => break,
841 }
842 }
843 if let (Some(failed_at), Some(orig_err)) = (abort_idx, abort_err) {
846 let shadow = shadow_for_batch_abort(&orig_err);
848 for (i, tx) in responders.into_iter().enumerate() {
849 if i == failed_at {
850 let _ = tx.send(Err(clone_dcb_error(&orig_err)));
851 } else {
852 let _ = tx.send(Err(clone_dcb_error(&shadow)));
853 }
854 }
855 continue;
857 }
858
859 let batch_result = match mvcc.commit(&mut writer) {
861 Ok(_) => Ok(results),
862 Err(err) => Err(err),
863 };
864
865 match batch_result {
866 Ok(results) => {
867 for (res, tx) in results.into_iter().zip(responders.into_iter())
869 {
870 let _ = tx.send(res);
871 }
872 let last_committed = writer.next_position.0.saturating_sub(1);
874 let new_head = if last_committed == 0 {
875 None
876 } else {
877 Some(last_committed)
878 };
879 let _ = head_tx_writer.send(new_head);
880 }
881 Err(e) => {
882 let total = responders.len();
886 let mut iter = responders.into_iter();
887 for _ in 0..total {
888 if let Some(tx) = iter.next() {
889 let _ = tx.send(Err(clone_dcb_error(&e)));
890 }
891 }
892 }
893 }
894 }
895 WriterRequest::Shutdown => {
896 break;
897 }
898 }
899 }
900 });
901 });
902
903 Ok(Self {
904 mvcc,
905 head_watch_tx: head_tx,
906 writer_request_tx: request_tx,
907 })
908 }
909
910 async fn read(
911 &self,
912 query: Option<DCBQuery>,
913 start: Option<u64>,
914 backwards: bool,
915 limit: Option<u32>,
916 ) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
917 let reader = self.mvcc.reader()?;
918 let last_committed_position = reader.next_position.0.saturating_sub(1);
919
920 let q = query.unwrap_or(DCBQuery { items: vec![] });
921 let start_position = start.map(Position);
922
923 let events = read_conditional(
924 &self.mvcc,
925 &std::collections::HashMap::new(),
926 reader.events_tree_root_id,
927 reader.tags_tree_root_id,
928 q,
929 start_position,
930 backwards,
931 limit,
932 false,
933 )
934 .map_err(|e| DCBError::Corruption(format!("{e}")))?;
935
936 let head = if limit.is_none() {
937 if last_committed_position == 0 {
938 None
939 } else {
940 Some(last_committed_position)
941 }
942 } else {
943 events.last().map(|e| e.position)
944 };
945
946 Ok((events, head))
947 }
948
949 async fn head(&self) -> DCBResult<Option<u64>> {
950 let (_, header) = self
951 .mvcc
952 .get_latest_header()
953 .map_err(|e| DCBError::Corruption(format!("{e}")))?;
954 let last = header.next_position.0.saturating_sub(1);
955 if last == 0 { Ok(None) } else { Ok(Some(last)) }
956 }
957
958 async fn get_tracking_info(&self, source: String) -> DCBResult<Option<u64>> {
959 let db = UmaDB::from_arc(self.mvcc.clone());
960 db.get_tracking_info(&source)
961 }
962
963 pub async fn append(
964 &self,
965 events: Vec<DCBEvent>,
966 condition: Option<DCBAppendCondition>,
967 tracking_info: Option<TrackingInfo>,
968 ) -> DCBResult<u64> {
969 let pre_append_decision = if let Some(mut given_condition) = condition {
971 let reader = self.mvcc.reader()?;
972 let current_head = {
973 let last = reader.next_position.0.saturating_sub(1);
974 if last == 0 { None } else { Some(last) }
975 };
976
977 let from = given_condition.after.map(|after| Position(after + 1));
979 let empty_dirty = std::collections::HashMap::new();
980 let found = read_conditional(
981 &self.mvcc,
982 &empty_dirty,
983 reader.events_tree_root_id,
984 reader.tags_tree_root_id,
985 given_condition.fail_if_events_match.clone(),
986 from,
987 false,
988 Some(1),
989 false,
990 )?;
991
992 if let Some(matched) = found.first() {
993 match is_request_idempotent(
995 &self.mvcc,
996 &empty_dirty,
997 reader.events_tree_root_id,
998 reader.tags_tree_root_id,
999 &events,
1000 given_condition.fail_if_events_match.clone(),
1001 from,
1002 ) {
1003 Ok(Some(last_recorded_position)) => {
1004 PreAppendDecision::AlreadyAppended(last_recorded_position)
1006 }
1007 Ok(None) => {
1008 let msg = format!(
1010 "condition: {:?} matched: {:?}",
1011 given_condition.clone(),
1012 matched,
1013 );
1014 return Err(DCBError::IntegrityError(msg));
1015 }
1016 Err(err) => {
1017 return Err(err);
1019 }
1020 }
1021 } else {
1022 let new_after = std::cmp::max(
1024 given_condition.after.unwrap_or(0),
1025 current_head.unwrap_or(0),
1026 );
1027 given_condition.after = Some(new_after);
1028
1029 PreAppendDecision::UseCondition(Some(given_condition))
1030 }
1031 } else {
1032 PreAppendDecision::UseCondition(None)
1034 };
1035
1036 match pre_append_decision {
1038 PreAppendDecision::AlreadyAppended(last_found_position) => {
1039 Ok(last_found_position)
1041 }
1042 PreAppendDecision::UseCondition(adjusted_condition) => {
1043 let (response_tx, response_rx) = oneshot::channel();
1045
1046 self.writer_request_tx
1047 .send(WriterRequest::Append {
1048 events,
1049 condition: adjusted_condition,
1050 tracking_info,
1051 response_tx,
1052 })
1053 .await
1054 .map_err(|_| {
1055 DCBError::Io(std::io::Error::other(
1056 "Failed to send append request to EventStore thread",
1057 ))
1058 })?;
1059
1060 response_rx.await.map_err(|_| {
1061 DCBError::Io(std::io::Error::other(
1062 "Failed to receive append response from EventStore thread",
1063 ))
1064 })?
1065 }
1066 }
1067 }
1068
1069 fn watch_head(&self) -> watch::Receiver<Option<u64>> {
1070 self.head_watch_tx.subscribe()
1071 }
1072
1073 #[allow(dead_code)]
1074 async fn shutdown(&self) {
1075 let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
1076 }
1077}
1078
1079impl Clone for RequestHandler {
1081 fn clone(&self) -> Self {
1082 Self {
1083 mvcc: self.mvcc.clone(),
1084 head_watch_tx: self.head_watch_tx.clone(),
1085 writer_request_tx: self.writer_request_tx.clone(),
1086 }
1087 }
1088}
1089
1090#[derive(Debug)]
1091enum PreAppendDecision {
1092 UseCondition(Option<DCBAppendCondition>),
1094 AlreadyAppended(u64),
1096}