1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::LazyLock;
7use std::thread;
8use std::time::Instant;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::{Identity, ServerTlsConfig};
12use tonic::{Request, Response, Status, transport::Server};
13use umadb_core::db::{
14 DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDB, is_request_idempotent, read_conditional,
15};
16use umadb_core::mvcc::Mvcc;
17use umadb_dcb::{
18 DCBAppendCondition, DCBError, DCBEvent, DCBQuery, DCBResult, DCBSequencedEvent, TrackingInfo,
19};
20
21use tokio::runtime::Runtime;
22use tonic::codegen::http;
23use tonic::transport::server::TcpIncoming;
24use umadb_core::common::Position;
25
26use std::convert::Infallible;
27use std::future::Future;
28use std::task::{Context, Poll};
29use tonic::server::NamedService;
30use umadb_proto::status_from_dcb_error;
31
32#[derive(Clone, Debug)]
34pub struct PathRewriterService<S> {
35 inner: S,
36}
37
38impl<S> tower::Service<http::Request<tonic::body::Body>> for PathRewriterService<S>
39where
40 S: tower::Service<
41 http::Request<tonic::body::Body>,
42 Response = http::Response<tonic::body::Body>,
43 Error = Infallible,
44 > + Clone
45 + Send
46 + 'static,
47 S::Future: Send + 'static,
48{
49 type Response = S::Response;
50 type Error = S::Error;
51 type Future =
52 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
53
54 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55 self.inner.poll_ready(cx)
56 }
57
58 fn call(&mut self, mut req: http::Request<tonic::body::Body>) -> Self::Future {
59 let uri = req.uri().clone();
60 let path = uri.path();
61
62 if path.starts_with("/umadb.UmaDBService/") {
64 let new_path_str = path.replace("/umadb.UmaDBService/", "/umadb.v1.DCB/");
65
66 let new_uri = if let (Some(scheme), Some(authority)) = (uri.scheme(), uri.authority()) {
69 http::Uri::builder()
71 .scheme(scheme.clone())
72 .authority(authority.clone())
73 .path_and_query(new_path_str.as_str())
74 .build()
75 .ok() } else {
77 new_path_str.parse::<http::Uri>().ok()
80 };
81
82 if let Some(final_uri) = new_uri {
83 *req.uri_mut() = final_uri;
84 } else {
85 eprintln!("Failed to construct valid URI for path: {}", path);
86 }
87 }
88
89 let fut = self.inner.call(req);
90 Box::pin(fut)
91 }
92}
93
94impl<S: NamedService> NamedService for PathRewriterService<S> {
96 const NAME: &'static str = S::NAME;
97}
98
99#[derive(Clone, Debug)]
100pub struct PathRewriterLayer;
101
102impl<S> tower::Layer<S> for PathRewriterLayer
103where
104 S: tower::Service<
105 http::Request<tonic::body::Body>,
106 Response = http::Response<tonic::body::Body>,
107 Error = Infallible,
108 > + Clone
109 + Send
110 + 'static,
111 S::Future: Send + 'static,
112{
113 type Service = PathRewriterService<S>;
114
115 fn layer(&self, inner: S) -> Self::Service {
116 PathRewriterService { inner }
117 }
118}
119
120static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
121
122const APPEND_BATCH_MAX_EVENTS: usize = 2000;
123const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
124const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
125
126#[derive(Clone, Debug)]
128pub struct ServerTlsOptions {
129 pub cert_pem: Vec<u8>,
130 pub key_pem: Vec<u8>,
131}
132
133pub fn uptime() -> std::time::Duration {
134 START_TIME.elapsed()
135}
136
137fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
138 use std::time::Duration;
139 let mut server_builder = Server::builder()
140 .http2_keepalive_interval(Some(Duration::from_secs(5)))
141 .http2_keepalive_timeout(Some(Duration::from_secs(10)))
142 .initial_stream_window_size(Some(4 * 1024 * 1024))
143 .initial_connection_window_size(Some(8 * 1024 * 1024))
144 .tcp_nodelay(true)
145 .concurrency_limit_per_connection(1024);
146
147 if let Some(opts) = tls {
148 let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
149 server_builder = server_builder
150 .tls_config(ServerTlsConfig::new().identity(identity))
151 .expect("failed to apply TLS config");
152 }
153
154 server_builder
155}
156
157pub async fn start_server<P: AsRef<Path> + Send + 'static>(
159 path: P,
160 addr: &str,
161 shutdown_rx: oneshot::Receiver<()>,
162) -> Result<(), Box<dyn std::error::Error>> {
163 start_server_internal(path, addr, shutdown_rx, None, None).await
164}
165
166pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
168 path: P,
169 addr: &str,
170 shutdown_rx: oneshot::Receiver<()>,
171 cert_pem: Vec<u8>,
172 key_pem: Vec<u8>,
173) -> Result<(), Box<dyn std::error::Error>> {
174 let tls = ServerTlsOptions { cert_pem, key_pem };
175 start_server_internal(path, addr, shutdown_rx, Some(tls), None).await
176}
177
178pub async fn start_server_secure_from_files<
180 P: AsRef<Path> + Send + 'static,
181 CP: AsRef<Path>,
182 KP: AsRef<Path>,
183>(
184 path: P,
185 addr: &str,
186 shutdown_rx: oneshot::Receiver<()>,
187 cert_path: CP,
188 key_path: KP,
189) -> Result<(), Box<dyn std::error::Error>> {
190 let cert_path_ref = cert_path.as_ref();
191 let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
192 format!(
193 "Failed to open TLS certificate file '{}': {}",
194 cert_path_ref.display(),
195 e
196 )
197 .into()
198 })?;
199
200 let key_path_ref = key_path.as_ref();
201 let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
202 format!(
203 "Failed to open TLS key file '{}': {}",
204 key_path_ref.display(),
205 e
206 )
207 .into()
208 })?;
209 start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
210}
211
212pub async fn start_server_with_api_key<P: AsRef<Path> + Send + 'static>(
214 path: P,
215 addr: &str,
216 shutdown_rx: oneshot::Receiver<()>,
217 api_key: String,
218) -> Result<(), Box<dyn std::error::Error>> {
219 start_server_internal(path, addr, shutdown_rx, None, Some(api_key)).await
220}
221
222pub async fn start_server_secure_with_api_key<P: AsRef<Path> + Send + 'static>(
224 path: P,
225 addr: &str,
226 shutdown_rx: oneshot::Receiver<()>,
227 cert_pem: Vec<u8>,
228 key_pem: Vec<u8>,
229 api_key: String,
230) -> Result<(), Box<dyn std::error::Error>> {
231 let tls = ServerTlsOptions { cert_pem, key_pem };
232 start_server_internal(path, addr, shutdown_rx, Some(tls), Some(api_key)).await
233}
234
235pub async fn start_server_secure_from_files_with_api_key<
237 P: AsRef<Path> + Send + 'static,
238 CP: AsRef<Path>,
239 KP: AsRef<Path>,
240>(
241 path: P,
242 addr: &str,
243 shutdown_rx: oneshot::Receiver<()>,
244 cert_path: CP,
245 key_path: KP,
246 api_key: String,
247) -> Result<(), Box<dyn std::error::Error>> {
248 let cert_path_ref = cert_path.as_ref();
249 let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
250 format!(
251 "Failed to open TLS certificate file '{}': {}",
252 cert_path_ref.display(),
253 e
254 )
255 .into()
256 })?;
257
258 let key_path_ref = key_path.as_ref();
259 let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
260 format!(
261 "Failed to open TLS key file '{}': {}",
262 key_path_ref.display(),
263 e
264 )
265 .into()
266 })?;
267 start_server_secure_with_api_key(path, addr, shutdown_rx, cert_pem, key_pem, api_key).await
268}
269
270async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
271 path: P,
272 addr: &str,
273 shutdown_rx: oneshot::Receiver<()>,
274 tls: Option<ServerTlsOptions>,
275 api_key: Option<String>,
276) -> Result<(), Box<dyn std::error::Error>> {
277 let addr = addr.parse()?;
278 let incoming = match TcpIncoming::bind(addr) {
280 Ok(incoming) => incoming,
281 Err(err) => {
282 return Err(Box::new(DCBError::InitializationError(format!(
283 "Failed to bind to address {}: {}",
284 addr, err
285 ))));
286 }
287 }
288 .with_nodelay(Some(true))
289 .with_keepalive(Some(std::time::Duration::from_secs(60)));
290
291 let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
293 let dcb_server =
294 match DCBServer::new(path.as_ref().to_owned(), srv_shutdown_rx, api_key.clone()) {
295 Ok(server) => server,
296 Err(err) => {
297 return Err(Box::new(err));
298 }
299 };
300
301 println!(
302 "UmaDB has {:?} events",
303 dcb_server.request_handler.head().await?.unwrap_or(0)
304 );
305 let tls_mode_display_str = if tls.is_some() {
306 "with TLS"
307 } else {
308 "without TLS"
309 };
310
311 let api_key_display_str = if api_key.is_some() {
312 "with API key"
313 } else {
314 "without API key"
315 };
316
317 use tonic_health::ServingStatus; let (health_reporter, health_service) = tonic_health::server::health_reporter();
320 health_reporter
322 .set_service_status("", ServingStatus::Serving)
323 .await;
324 health_reporter
325 .set_service_status("umadb.v1.DCB", ServingStatus::Serving)
326 .await;
327 let health_reporter_for_shutdown = health_reporter.clone();
328
329 let mut builder = build_server_builder_with_options(tls)
331 .layer(PathRewriterLayer)
332 .add_service(health_service);
333
334 builder = builder.add_service(dcb_server.into_service());
336 let router = builder;
337
338 println!("UmaDB is listening on {addr} ({tls_mode_display_str}, {api_key_display_str})");
339 println!("UmaDB started in {:?}", uptime());
340 router
342 .serve_with_incoming_shutdown(incoming, async move {
343 let _ = shutdown_rx.await;
345 let _ = health_reporter_for_shutdown
347 .set_service_status("", ServingStatus::NotServing)
348 .await;
349 let _ = health_reporter_for_shutdown
350 .set_service_status("umadb.v1.DCB", ServingStatus::NotServing)
351 .await;
352 let _ = srv_shutdown_tx.send(true);
354 println!("UmaDB server shutdown complete");
355 })
356 .await?;
357
358 Ok(())
359}
360
361pub struct DCBServer {
363 pub(crate) request_handler: RequestHandler,
364 shutdown_watch_rx: watch::Receiver<bool>,
365 api_key: Option<String>,
366}
367
368impl DCBServer {
369 pub fn new<P: AsRef<Path> + Send + 'static>(
370 path: P,
371 shutdown_rx: watch::Receiver<bool>,
372 api_key: Option<String>,
373 ) -> DCBResult<Self> {
374 let command_handler = RequestHandler::new(path)?;
375 Ok(Self {
376 request_handler: command_handler,
377 shutdown_watch_rx: shutdown_rx,
378 api_key,
379 })
380 }
381
382 pub fn into_service(self) -> umadb_proto::v1::dcb_server::DcbServer<Self> {
383 umadb_proto::v1::dcb_server::DcbServer::new(self)
384 }
385}
386
387#[tonic::async_trait]
388impl umadb_proto::v1::dcb_server::Dcb for DCBServer {
389 type ReadStream =
390 Pin<Box<dyn Stream<Item = Result<umadb_proto::v1::ReadResponse, Status>> + Send + 'static>>;
391
392 async fn read(
393 &self,
394 request: Request<umadb_proto::v1::ReadRequest>,
395 ) -> Result<Response<Self::ReadStream>, Status> {
396 if let Some(expected) = &self.api_key {
398 let auth = request.metadata().get("authorization");
399 let expected_val = format!("Bearer {}", expected);
400 let ok = auth
401 .and_then(|m| m.to_str().ok())
402 .map(|s| s == expected_val)
403 .unwrap_or(false);
404 if !ok {
405 return Err(status_from_dcb_error(DCBError::AuthenticationError(
406 "missing or invalid API key".to_string(),
407 )));
408 }
409 }
410 let read_request = request.into_inner();
411
412 let mut query: Option<DCBQuery> = read_request.query.map(|q| q.into());
414 let start = read_request.start;
415 let backwards = read_request.backwards.unwrap_or(false);
416 let limit = read_request.limit;
417 let batch_size = read_request
419 .batch_size
420 .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
421 .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
422 let subscribe = read_request.subscribe.unwrap_or(false);
423
424 let (tx, rx) = mpsc::channel(2048);
426 let request_handler = self.request_handler.clone();
428 let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
430
431 tokio::spawn(async move {
433 let query_clone = query.take();
435 let mut next_start = start;
436 let mut sent_any = false;
437 let mut remaining_limit = limit.unwrap_or(u32::MAX);
438 let mut head_rx = request_handler.watch_head();
441 let captured_head = if !subscribe {
443 request_handler.head().await.unwrap_or(None)
444 } else {
445 None
446 };
447 loop {
448 if subscribe {
451 if tx.is_closed() {
452 break;
453 }
454 if *shutdown_watch_rx.borrow() {
455 break;
456 }
457 }
458 let read_limit = remaining_limit.min(batch_size);
460 if subscribe && limit.is_some() && remaining_limit == 0 {
462 break;
463 }
464 match request_handler
465 .read(query_clone.clone(), next_start, backwards, Some(read_limit))
466 .await
467 {
468 Ok((dcb_sequenced_events, head)) => {
469 let original_len = dcb_sequenced_events.len();
471
472 let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
474 dcb_sequenced_events
475 .into_iter()
476 .filter(|e| {
477 if let Some(h) = captured_head {
478 e.position <= h
479 } else {
480 true
481 }
482 })
483 .map(umadb_proto::v1::SequencedEvent::from)
484 .collect();
485
486 let reached_captured_head = if captured_head.is_some() {
487 sequenced_event_protos.len() < original_len
489 } else {
490 false
491 };
492
493 let last_event_position = sequenced_event_protos.last().map(|e| e.position);
498 let head_to_send = if subscribe {
499 head
500 } else if limit.is_none() {
501 captured_head
502 } else {
503 last_event_position.or(head)
504 };
505
506 if sequenced_event_protos.is_empty() {
507 if !sent_any {
509 let response = umadb_proto::v1::ReadResponse {
510 events: vec![],
511 head: head_to_send,
512 };
513 let _ = tx.send(Ok(response)).await;
514 }
515 if subscribe {
517 tokio::select! {
519 _ = head_rx.changed() => {},
520 _ = shutdown_watch_rx.changed() => {},
521 _ = tx.closed() => {},
522 }
523 continue;
524 }
525 break;
526 }
527
528 let sent_count = sequenced_event_protos.len() as u32;
530
531 let response = umadb_proto::v1::ReadResponse {
532 events: sequenced_event_protos,
533 head: head_to_send,
534 };
535
536 if tx.send(Ok(response)).await.is_err() {
537 break;
538 }
539 sent_any = true;
540
541 next_start =
543 last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
544
545 if reached_captured_head && !subscribe {
548 break;
549 }
550
551 if limit.is_some() {
553 if remaining_limit <= sent_count {
554 remaining_limit = 0;
555 } else {
556 remaining_limit -= sent_count;
557 }
558 if remaining_limit == 0 {
559 break;
560 }
561 }
562
563 tokio::task::yield_now().await;
565 }
566 Err(e) => {
567 let _ = tx.send(Err(status_from_dcb_error(e))).await;
568 break;
569 }
570 }
571 }
572 });
573
574 Ok(Response::new(
576 Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
577 ))
578 }
579
580 async fn append(
581 &self,
582 request: Request<umadb_proto::v1::AppendRequest>,
583 ) -> Result<Response<umadb_proto::v1::AppendResponse>, Status> {
584 if let Some(expected) = &self.api_key {
586 let auth = request.metadata().get("authorization");
587 let expected_val = format!("Bearer {}", expected);
588 let ok = auth
589 .and_then(|m| m.to_str().ok())
590 .map(|s| s == expected_val)
591 .unwrap_or(false);
592 if !ok {
593 return Err(status_from_dcb_error(DCBError::AuthenticationError(
594 "missing or invalid API key".to_string(),
595 )));
596 }
597 }
598 let req = request.into_inner();
599
600 let events: Vec<DCBEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
602 Ok(events) => events,
603 Err(e) => {
604 return Err(status_from_dcb_error(e));
605 }
606 };
607 let condition = req.condition.map(|c| c.into());
608
609 match self
611 .request_handler
612 .append(
613 events,
614 condition,
615 req.tracking_info.map(|t| TrackingInfo {
616 source: t.source,
617 position: t.position,
618 }),
619 )
620 .await
621 {
622 Ok(position) => Ok(Response::new(umadb_proto::v1::AppendResponse { position })),
623 Err(e) => Err(status_from_dcb_error(e)),
624 }
625 }
626
627 async fn head(
628 &self,
629 request: Request<umadb_proto::v1::HeadRequest>,
630 ) -> Result<Response<umadb_proto::v1::HeadResponse>, Status> {
631 if let Some(expected) = &self.api_key {
633 let auth = request.metadata().get("authorization");
634 let expected_val = format!("Bearer {}", expected);
635 let ok = auth
636 .and_then(|m| m.to_str().ok())
637 .map(|s| s == expected_val)
638 .unwrap_or(false);
639 if !ok {
640 return Err(status_from_dcb_error(DCBError::AuthenticationError(
641 "missing or invalid API key".to_string(),
642 )));
643 }
644 }
645 match self.request_handler.head().await {
647 Ok(position) => {
648 Ok(Response::new(umadb_proto::v1::HeadResponse { position }))
650 }
651 Err(e) => Err(status_from_dcb_error(e)),
652 }
653 }
654
655 async fn get_tracking_info(
656 &self,
657 request: Request<umadb_proto::v1::TrackingRequest>,
658 ) -> Result<Response<umadb_proto::v1::TrackingResponse>, Status> {
659 if let Some(expected) = &self.api_key {
661 let auth = request.metadata().get("authorization");
662 let expected_val = format!("Bearer {}", expected);
663 let ok = auth
664 .and_then(|m| m.to_str().ok())
665 .map(|s| s == expected_val)
666 .unwrap_or(false);
667 if !ok {
668 return Err(status_from_dcb_error(DCBError::AuthenticationError(
669 "missing or invalid API key".to_string(),
670 )));
671 }
672 }
673 let req = request.into_inner();
674 match self.request_handler.get_tracking_info(req.source).await {
675 Ok(position) => Ok(Response::new(umadb_proto::v1::TrackingResponse {
676 position,
677 })),
678 Err(e) => Err(status_from_dcb_error(e)),
679 }
680 }
681}
682
683enum WriterRequest {
685 Append {
686 events: Vec<DCBEvent>,
687 condition: Option<DCBAppendCondition>,
688 tracking_info: Option<TrackingInfo>,
689 response_tx: oneshot::Sender<DCBResult<u64>>,
690 },
691 Shutdown,
692}
693
694struct RequestHandler {
696 mvcc: Arc<Mvcc>,
697 head_watch_tx: watch::Sender<Option<u64>>,
698 writer_request_tx: mpsc::Sender<WriterRequest>,
699}
700
701impl RequestHandler {
702 fn new<P: AsRef<Path> + Send + 'static>(path: P) -> DCBResult<Self> {
703 let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
705
706 let p = path.as_ref();
708 let file_path = if p.is_dir() {
709 p.join(DEFAULT_DB_FILENAME)
710 } else {
711 p.to_path_buf()
712 };
713 let mvcc = Arc::new(Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)?);
714
715 let init_head = {
717 let (_, header) = mvcc.get_latest_header()?;
718 let last = header.next_position.0.saturating_sub(1);
719 if last == 0 { None } else { Some(last) }
720 };
721 let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
722
723 let mvcc_for_writer = mvcc.clone();
725 let head_tx_writer = head_tx.clone();
726 thread::spawn(move || {
727 let db = UmaDB::from_arc(mvcc_for_writer);
728
729 let rt = Runtime::new().unwrap();
731
732 rt.block_on(async {
734 while let Some(request) = request_rx.recv().await {
735 match request {
736 WriterRequest::Append {
737 events,
738 condition,
739 tracking_info,
740 response_tx,
741 } => {
742 let mut total_events = 0;
747 total_events += events.len();
748 let mvcc = &db.mvcc;
751 let mut writer = match mvcc.writer() {
752 Ok(writer) => writer,
753 Err(err) => {
754 let _ = response_tx.send(Err(err));
755 continue;
756 }
757 };
758
759 let mut responders: Vec<oneshot::Sender<DCBResult<u64>>> = Vec::new();
760 let mut results: Vec<DCBResult<u64>> = Vec::new();
761
762 responders.push(response_tx);
763 let mut result = UmaDB::process_append_request(
764 events,
765 condition,
766 tracking_info,
767 mvcc,
768 &mut writer,
769 );
770 results.push(result);
771
772 loop {
776 if total_events >= APPEND_BATCH_MAX_EVENTS {
777 break;
778 }
779 match request_rx.try_recv() {
780 Ok(WriterRequest::Append {
781 events,
782 condition,
783 tracking_info,
784 response_tx,
785 }) => {
786 let ev_len = events.len();
787 responders.push(response_tx);
788 result = UmaDB::process_append_request(
789 events,
790 condition,
791 tracking_info,
792 mvcc,
793 &mut writer,
794 );
795 results.push(result);
796 total_events += ev_len;
797 }
798 Ok(WriterRequest::Shutdown) => {
799 break;
804 }
805 Err(mpsc::error::TryRecvError::Empty) => {
806 break;
807 }
808 Err(mpsc::error::TryRecvError::Disconnected) => break,
809 }
810 }
811 let batch_result = match mvcc.commit(&mut writer) {
815 Ok(_) => Ok(results),
816 Err(err) => Err(err),
817 };
818
819 match batch_result {
820 Ok(results) => {
821 let mut max_ok: Option<u64> = None;
824 for (res, tx) in results.into_iter().zip(responders.into_iter())
825 {
826 if let Ok(v) = &res {
827 max_ok = Some(max_ok.map_or(*v, |m| m.max(*v)));
828 }
829 let _ = tx.send(res);
830 }
831 if let Some(h) = max_ok {
833 let _ = head_tx_writer.send(Some(h));
834 }
835 }
836 Err(e) => {
837 let total = responders.len();
841 let mut iter = responders.into_iter();
842 for _ in 0..total {
843 if let Some(tx) = iter.next() {
844 let _ = tx.send(Err(clone_dcb_error(&e)));
845 }
846 }
847 }
848 }
849 }
850 WriterRequest::Shutdown => {
851 break;
852 }
853 }
854 }
855 });
856 });
857
858 Ok(Self {
859 mvcc,
860 head_watch_tx: head_tx,
861 writer_request_tx: request_tx,
862 })
863 }
864
865 async fn read(
866 &self,
867 query: Option<DCBQuery>,
868 start: Option<u64>,
869 backwards: bool,
870 limit: Option<u32>,
871 ) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
872 let reader = self.mvcc.reader()?;
873 let last_committed_position = reader.next_position.0.saturating_sub(1);
874
875 let q = query.unwrap_or(DCBQuery { items: vec![] });
876 let start_position = start.map(Position);
877
878 let events = read_conditional(
879 &self.mvcc,
880 &std::collections::HashMap::new(),
881 reader.events_tree_root_id,
882 reader.tags_tree_root_id,
883 q,
884 start_position,
885 backwards,
886 limit,
887 false,
888 )
889 .map_err(|e| DCBError::Corruption(format!("{e}")))?;
890
891 let head = if limit.is_none() {
892 if last_committed_position == 0 {
893 None
894 } else {
895 Some(last_committed_position)
896 }
897 } else {
898 events.last().map(|e| e.position)
899 };
900
901 Ok((events, head))
902 }
903
904 async fn head(&self) -> DCBResult<Option<u64>> {
905 let (_, header) = self
906 .mvcc
907 .get_latest_header()
908 .map_err(|e| DCBError::Corruption(format!("{e}")))?;
909 let last = header.next_position.0.saturating_sub(1);
910 if last == 0 { Ok(None) } else { Ok(Some(last)) }
911 }
912
913 async fn get_tracking_info(&self, source: String) -> DCBResult<Option<u64>> {
914 let db = UmaDB::from_arc(self.mvcc.clone());
915 db.get_tracking_info(&source)
916 }
917
918 pub async fn append(
919 &self,
920 events: Vec<DCBEvent>,
921 condition: Option<DCBAppendCondition>,
922 tracking_info: Option<TrackingInfo>,
923 ) -> DCBResult<u64> {
924 let pre_append_decision = if let Some(mut given_condition) = condition {
926 let reader = self.mvcc.reader()?;
927 let current_head = {
928 let last = reader.next_position.0.saturating_sub(1);
929 if last == 0 { None } else { Some(last) }
930 };
931
932 let from = given_condition.after.map(|after| Position(after + 1));
934 let empty_dirty = std::collections::HashMap::new();
935 let found = read_conditional(
936 &self.mvcc,
937 &empty_dirty,
938 reader.events_tree_root_id,
939 reader.tags_tree_root_id,
940 given_condition.fail_if_events_match.clone(),
941 from,
942 false,
943 Some(1),
944 false,
945 )?;
946
947 if let Some(matched) = found.first() {
948 match is_request_idempotent(
950 &self.mvcc,
951 &empty_dirty,
952 reader.events_tree_root_id,
953 reader.tags_tree_root_id,
954 &events,
955 given_condition.fail_if_events_match.clone(),
956 from,
957 ) {
958 Ok(Some(last_recorded_position)) => {
959 PreAppendDecision::AlreadyAppended(last_recorded_position)
961 }
962 Ok(None) => {
963 let msg = format!(
965 "condition: {:?} matched: {:?}",
966 given_condition.clone(),
967 matched,
968 );
969 return Err(DCBError::IntegrityError(msg));
970 }
971 Err(err) => {
972 return Err(err);
974 }
975 }
976 } else {
977 let new_after = std::cmp::max(
979 given_condition.after.unwrap_or(0),
980 current_head.unwrap_or(0),
981 );
982 given_condition.after = Some(new_after);
983
984 PreAppendDecision::UseCondition(Some(given_condition))
985 }
986 } else {
987 PreAppendDecision::UseCondition(None)
989 };
990
991 match pre_append_decision {
993 PreAppendDecision::AlreadyAppended(last_found_position) => {
994 Ok(last_found_position)
996 }
997 PreAppendDecision::UseCondition(adjusted_condition) => {
998 let (response_tx, response_rx) = oneshot::channel();
1000
1001 self.writer_request_tx
1002 .send(WriterRequest::Append {
1003 events,
1004 condition: adjusted_condition,
1005 tracking_info: tracking_info,
1006 response_tx,
1007 })
1008 .await
1009 .map_err(|_| {
1010 DCBError::Io(std::io::Error::other(
1011 "Failed to send append request to EventStore thread",
1012 ))
1013 })?;
1014
1015 response_rx.await.map_err(|_| {
1016 DCBError::Io(std::io::Error::other(
1017 "Failed to receive append response from EventStore thread",
1018 ))
1019 })?
1020 }
1021 }
1022 }
1023
1024 fn watch_head(&self) -> watch::Receiver<Option<u64>> {
1025 self.head_watch_tx.subscribe()
1026 }
1027
1028 #[allow(dead_code)]
1029 async fn shutdown(&self) {
1030 let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
1031 }
1032}
1033
1034fn clone_dcb_error(src: &DCBError) -> DCBError {
1035 match src {
1036 DCBError::AuthenticationError(err) => DCBError::AuthenticationError(err.to_string()),
1037 DCBError::InitializationError(err) => DCBError::InitializationError(err.to_string()),
1038 DCBError::Io(err) => DCBError::Io(std::io::Error::other(err.to_string())),
1039 DCBError::IntegrityError(s) => DCBError::IntegrityError(s.clone()),
1040 DCBError::Corruption(s) => DCBError::Corruption(s.clone()),
1041 DCBError::PageNotFound(id) => DCBError::PageNotFound(*id),
1042 DCBError::DirtyPageNotFound(id) => DCBError::DirtyPageNotFound(*id),
1043 DCBError::RootIDMismatch(old_id, new_id) => DCBError::RootIDMismatch(*old_id, *new_id),
1044 DCBError::DatabaseCorrupted(s) => DCBError::DatabaseCorrupted(s.clone()),
1045 DCBError::InternalError(s) => DCBError::InternalError(s.clone()),
1046 DCBError::SerializationError(s) => DCBError::SerializationError(s.clone()),
1047 DCBError::DeserializationError(s) => DCBError::DeserializationError(s.clone()),
1048 DCBError::PageAlreadyFreed(id) => DCBError::PageAlreadyFreed(*id),
1049 DCBError::PageAlreadyDirty(id) => DCBError::PageAlreadyDirty(*id),
1050 DCBError::TransportError(err) => DCBError::TransportError(err.clone()),
1051 DCBError::CancelledByUser() => DCBError::CancelledByUser(),
1052 }
1053}
1054
1055impl Clone for RequestHandler {
1057 fn clone(&self) -> Self {
1058 Self {
1059 mvcc: self.mvcc.clone(),
1060 head_watch_tx: self.head_watch_tx.clone(),
1061 writer_request_tx: self.writer_request_tx.clone(),
1062 }
1063 }
1064}
1065
1066#[derive(Debug)]
1067enum PreAppendDecision {
1068 UseCondition(Option<DCBAppendCondition>),
1070 AlreadyAppended(u64),
1072}