umadb_server/
lib.rs

1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::LazyLock;
7use std::thread;
8use std::time::Instant;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::{Identity, ServerTlsConfig};
12use tonic::{Request, Response, Status, transport::Server};
13use umadb_core::db::{
14    DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDB, clone_dcb_error, is_integrity_error,
15    is_request_idempotent, read_conditional, shadow_for_batch_abort,
16};
17use umadb_core::mvcc::Mvcc;
18use umadb_dcb::{
19    DCBAppendCondition, DCBError, DCBEvent, DCBQuery, DCBResult, DCBSequencedEvent, TrackingInfo,
20};
21
22use tokio::runtime::Runtime;
23use tonic::codegen::http;
24use tonic::transport::server::TcpIncoming;
25use umadb_core::common::Position;
26
27use std::convert::Infallible;
28use std::future::Future;
29use std::task::{Context, Poll};
30use tonic::server::NamedService;
31use umadb_proto::status_from_dcb_error;
32
33// This is just to maintain compatibility for the very early unversioned API (pre-v1).
34#[derive(Clone, Debug)]
35pub struct PathRewriterService<S> {
36    inner: S,
37}
38
39impl<S> tower::Service<http::Request<tonic::body::Body>> for PathRewriterService<S>
40where
41    S: tower::Service<
42            http::Request<tonic::body::Body>,
43            Response = http::Response<tonic::body::Body>,
44            Error = Infallible,
45        > + Clone
46        + Send
47        + 'static,
48    S::Future: Send + 'static,
49{
50    type Response = S::Response;
51    type Error = S::Error;
52    type Future =
53        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
54
55    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
56        self.inner.poll_ready(cx)
57    }
58
59    fn call(&mut self, mut req: http::Request<tonic::body::Body>) -> Self::Future {
60        let uri = req.uri().clone();
61        let path = uri.path();
62
63        // Check and rewrite the path string first
64        if path.starts_with("/umadb.UmaDBService/") {
65            let new_path_str = path.replace("/umadb.UmaDBService/", "/umadb.v1.DCB/");
66
67            // Use the existing authority and scheme if present, otherwise default to a simple path-only URI structure
68            // which is often safer than hardcoded hostnames in internal systems.
69            let new_uri = if let (Some(scheme), Some(authority)) = (uri.scheme(), uri.authority()) {
70                // If we have all components, try to build the full URI
71                http::Uri::builder()
72                    .scheme(scheme.clone())
73                    .authority(authority.clone())
74                    .path_and_query(new_path_str.as_str())
75                    .build()
76                    .ok() // Convert the final build Result into an Option
77            } else {
78                // Fallback for malformed requests (missing scheme/authority)
79                // Just try to build a path-only URI
80                new_path_str.parse::<http::Uri>().ok()
81            };
82
83            if let Some(final_uri) = new_uri {
84                *req.uri_mut() = final_uri;
85            } else {
86                eprintln!("Failed to construct valid URI for path: {}", path);
87            }
88        }
89
90        let fut = self.inner.call(req);
91        Box::pin(fut)
92    }
93}
94
95// Add this implementation to satisfy the compiler error
96impl<S: NamedService> NamedService for PathRewriterService<S> {
97    const NAME: &'static str = S::NAME;
98}
99
100#[derive(Clone, Debug)]
101pub struct PathRewriterLayer;
102
103impl<S> tower::Layer<S> for PathRewriterLayer
104where
105    S: tower::Service<
106            http::Request<tonic::body::Body>,
107            Response = http::Response<tonic::body::Body>,
108            Error = Infallible,
109        > + Clone
110        + Send
111        + 'static,
112    S::Future: Send + 'static,
113{
114    type Service = PathRewriterService<S>;
115
116    fn layer(&self, inner: S) -> Self::Service {
117        PathRewriterService { inner }
118    }
119}
120
121static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
122
123const APPEND_BATCH_MAX_EVENTS: usize = 2000;
124const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
125const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
126
127// Optional TLS configuration helpers
128#[derive(Clone, Debug)]
129pub struct ServerTlsOptions {
130    pub cert_pem: Vec<u8>,
131    pub key_pem: Vec<u8>,
132}
133
134pub fn uptime() -> std::time::Duration {
135    START_TIME.elapsed()
136}
137
138fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
139    use std::time::Duration;
140    let mut server_builder = Server::builder()
141        .http2_keepalive_interval(Some(Duration::from_secs(5)))
142        .http2_keepalive_timeout(Some(Duration::from_secs(10)))
143        .initial_stream_window_size(Some(4 * 1024 * 1024))
144        .initial_connection_window_size(Some(8 * 1024 * 1024))
145        .tcp_nodelay(true)
146        .concurrency_limit_per_connection(1024);
147
148    if let Some(opts) = tls {
149        let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
150        server_builder = server_builder
151            .tls_config(ServerTlsConfig::new().identity(identity))
152            .expect("failed to apply TLS config");
153    }
154
155    server_builder
156}
157
158// Function to start the gRPC server with a shutdown signal
159pub async fn start_server<P: AsRef<Path> + Send + 'static>(
160    path: P,
161    addr: &str,
162    shutdown_rx: oneshot::Receiver<()>,
163) -> Result<(), Box<dyn std::error::Error>> {
164    start_server_internal(path, addr, shutdown_rx, None, None).await
165}
166
167/// Start server with TLS using PEM-encoded cert and key.
168pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
169    path: P,
170    addr: &str,
171    shutdown_rx: oneshot::Receiver<()>,
172    cert_pem: Vec<u8>,
173    key_pem: Vec<u8>,
174) -> Result<(), Box<dyn std::error::Error>> {
175    let tls = ServerTlsOptions { cert_pem, key_pem };
176    start_server_internal(path, addr, shutdown_rx, Some(tls), None).await
177}
178
179/// Convenience: load cert and key from filesystem paths
180pub async fn start_server_secure_from_files<
181    P: AsRef<Path> + Send + 'static,
182    CP: AsRef<Path>,
183    KP: AsRef<Path>,
184>(
185    path: P,
186    addr: &str,
187    shutdown_rx: oneshot::Receiver<()>,
188    cert_path: CP,
189    key_path: KP,
190) -> Result<(), Box<dyn std::error::Error>> {
191    let cert_path_ref = cert_path.as_ref();
192    let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
193        format!(
194            "Failed to open TLS certificate file '{}': {}",
195            cert_path_ref.display(),
196            e
197        )
198        .into()
199    })?;
200
201    let key_path_ref = key_path.as_ref();
202    let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
203        format!(
204            "Failed to open TLS key file '{}': {}",
205            key_path_ref.display(),
206            e
207        )
208        .into()
209    })?;
210    start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
211}
212
213/// Start server (insecure) requiring an API key for all RPCs.
214pub async fn start_server_with_api_key<P: AsRef<Path> + Send + 'static>(
215    path: P,
216    addr: &str,
217    shutdown_rx: oneshot::Receiver<()>,
218    api_key: String,
219) -> Result<(), Box<dyn std::error::Error>> {
220    start_server_internal(path, addr, shutdown_rx, None, Some(api_key)).await
221}
222
223/// Start TLS server requiring an API key for all RPCs.
224pub async fn start_server_secure_with_api_key<P: AsRef<Path> + Send + 'static>(
225    path: P,
226    addr: &str,
227    shutdown_rx: oneshot::Receiver<()>,
228    cert_pem: Vec<u8>,
229    key_pem: Vec<u8>,
230    api_key: String,
231) -> Result<(), Box<dyn std::error::Error>> {
232    let tls = ServerTlsOptions { cert_pem, key_pem };
233    start_server_internal(path, addr, shutdown_rx, Some(tls), Some(api_key)).await
234}
235
236/// TLS server from files requiring an API key
237pub async fn start_server_secure_from_files_with_api_key<
238    P: AsRef<Path> + Send + 'static,
239    CP: AsRef<Path>,
240    KP: AsRef<Path>,
241>(
242    path: P,
243    addr: &str,
244    shutdown_rx: oneshot::Receiver<()>,
245    cert_path: CP,
246    key_path: KP,
247    api_key: String,
248) -> Result<(), Box<dyn std::error::Error>> {
249    let cert_path_ref = cert_path.as_ref();
250    let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
251        format!(
252            "Failed to open TLS certificate file '{}': {}",
253            cert_path_ref.display(),
254            e
255        )
256        .into()
257    })?;
258
259    let key_path_ref = key_path.as_ref();
260    let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
261        format!(
262            "Failed to open TLS key file '{}': {}",
263            key_path_ref.display(),
264            e
265        )
266        .into()
267    })?;
268    start_server_secure_with_api_key(path, addr, shutdown_rx, cert_pem, key_pem, api_key).await
269}
270
271async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
272    path: P,
273    addr: &str,
274    shutdown_rx: oneshot::Receiver<()>,
275    tls: Option<ServerTlsOptions>,
276    api_key: Option<String>,
277) -> Result<(), Box<dyn std::error::Error>> {
278    let addr = addr.parse()?;
279    // ---- Bind incoming manually like tonic ----
280    let incoming = match TcpIncoming::bind(addr) {
281        Ok(incoming) => incoming,
282        Err(err) => {
283            return Err(Box::new(DCBError::InitializationError(format!(
284                "Failed to bind to address {}: {}",
285                addr, err
286            ))));
287        }
288    }
289    .with_nodelay(Some(true))
290    .with_keepalive(Some(std::time::Duration::from_secs(60)));
291
292    // Create a shutdown broadcast channel for terminating ongoing subscriptions
293    let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
294    let dcb_server =
295        match DCBServer::new(path.as_ref().to_owned(), srv_shutdown_rx, api_key.clone()) {
296            Ok(server) => server,
297            Err(err) => {
298                return Err(Box::new(err));
299            }
300        };
301
302    println!(
303        "UmaDB has {:?} events",
304        dcb_server.request_handler.head().await?.unwrap_or(0)
305    );
306    let tls_mode_display_str = if tls.is_some() {
307        "with TLS"
308    } else {
309        "without TLS"
310    };
311
312    let api_key_display_str = if api_key.is_some() {
313        "with API key"
314    } else {
315        "without API key"
316    };
317
318    // gRPC Health service setup
319    use tonic_health::ServingStatus; // server API expects this enum
320    let (health_reporter, health_service) = tonic_health::server::health_reporter();
321    // Set overall and service-specific health to SERVING
322    health_reporter
323        .set_service_status("", ServingStatus::Serving)
324        .await;
325    health_reporter
326        .set_service_status("umadb.v1.DCB", ServingStatus::Serving)
327        .await;
328    let health_reporter_for_shutdown = health_reporter.clone();
329
330    // Apply PathRewriterLayer at the server level to intercept all requests before routing
331    let mut builder = build_server_builder_with_options(tls)
332        .layer(PathRewriterLayer)
333        .add_service(health_service);
334
335    // Add DCB service (auth enforced inside RPC handlers if configured)
336    builder = builder.add_service(dcb_server.into_service());
337    let router = builder;
338
339    println!("UmaDB is listening on {addr} ({tls_mode_display_str}, {api_key_display_str})");
340    println!("UmaDB started in {:?}", uptime());
341    // let incoming = router.server.bind_incoming();
342    router
343        .serve_with_incoming_shutdown(incoming, async move {
344            // Wait for an external shutdown trigger
345            let _ = shutdown_rx.await;
346            // Mark health as NOT_SERVING before shutdown
347            let _ = health_reporter_for_shutdown
348                .set_service_status("", ServingStatus::NotServing)
349                .await;
350            let _ = health_reporter_for_shutdown
351                .set_service_status("umadb.v1.DCB", ServingStatus::NotServing)
352                .await;
353            // Broadcast shutdown to all subscription tasks
354            let _ = srv_shutdown_tx.send(true);
355            println!("UmaDB server shutdown complete");
356        })
357        .await?;
358
359    Ok(())
360}
361
362// gRPC server implementation
363pub struct DCBServer {
364    pub(crate) request_handler: RequestHandler,
365    shutdown_watch_rx: watch::Receiver<bool>,
366    api_key: Option<String>,
367}
368
369impl DCBServer {
370    pub fn new<P: AsRef<Path> + Send + 'static>(
371        path: P,
372        shutdown_rx: watch::Receiver<bool>,
373        api_key: Option<String>,
374    ) -> DCBResult<Self> {
375        let command_handler = RequestHandler::new(path)?;
376        Ok(Self {
377            request_handler: command_handler,
378            shutdown_watch_rx: shutdown_rx,
379            api_key,
380        })
381    }
382
383    pub fn into_service(self) -> umadb_proto::v1::dcb_server::DcbServer<Self> {
384        umadb_proto::v1::dcb_server::DcbServer::new(self)
385    }
386}
387
388#[tonic::async_trait]
389impl umadb_proto::v1::dcb_server::Dcb for DCBServer {
390    type ReadStream =
391        Pin<Box<dyn Stream<Item = Result<umadb_proto::v1::ReadResponse, Status>> + Send + 'static>>;
392
393    async fn read(
394        &self,
395        request: Request<umadb_proto::v1::ReadRequest>,
396    ) -> Result<Response<Self::ReadStream>, Status> {
397        // Enforce API key if configured
398        if let Some(expected) = &self.api_key {
399            let auth = request.metadata().get("authorization");
400            let expected_val = format!("Bearer {}", expected);
401            let ok = auth
402                .and_then(|m| m.to_str().ok())
403                .map(|s| s == expected_val)
404                .unwrap_or(false);
405            if !ok {
406                return Err(status_from_dcb_error(DCBError::AuthenticationError(
407                    "missing or invalid API key".to_string(),
408                )));
409            }
410        }
411        let read_request = request.into_inner();
412
413        // Convert protobuf query to DCB types
414        let mut query: Option<DCBQuery> = read_request.query.map(|q| q.into());
415        let start = read_request.start;
416        let backwards = read_request.backwards.unwrap_or(false);
417        let limit = read_request.limit;
418        // Cap requested batch size.
419        let batch_size = read_request
420            .batch_size
421            .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
422            .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
423        let subscribe = read_request.subscribe.unwrap_or(false);
424
425        // Create a channel for streaming responses (deeper buffer to reduce backpressure under concurrency)
426        let (tx, rx) = mpsc::channel(2048);
427        // Clone the request handler.
428        let request_handler = self.request_handler.clone();
429        // Clone the shutdown watch receiver.
430        let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
431
432        // Spawn a task to handle the read operation and stream multiple batches
433        tokio::spawn(async move {
434            // Ensure we can reuse the same query across batches
435            let query_clone = query.take();
436            let mut next_start = start;
437            let mut sent_any = false;
438            let mut remaining_limit = limit.unwrap_or(u32::MAX);
439            // Create a watch receiver for head updates (for subscriptions)
440            // TODO: Make this an Option and only do this for subscriptions?
441            let mut head_rx = request_handler.watch_head();
442            // If non-subscription read, capture head to preserve point-in-time semantics
443            let captured_head = if !subscribe {
444                request_handler.head().await.unwrap_or(None)
445            } else {
446                None
447            };
448            loop {
449                // If this is a subscription, exit if the client
450                // has gone away or the server is shutting down.
451                if subscribe {
452                    if tx.is_closed() {
453                        break;
454                    }
455                    if *shutdown_watch_rx.borrow() {
456                        break;
457                    }
458                }
459                // Determine per-iteration limit.
460                let read_limit = remaining_limit.min(batch_size);
461                // If subscription and remaining exhausted (limit reached), terminate
462                if subscribe && limit.is_some() && remaining_limit == 0 {
463                    break;
464                }
465                match request_handler
466                    .read(query_clone.clone(), next_start, backwards, Some(read_limit))
467                    .await
468                {
469                    Ok((dcb_sequenced_events, head)) => {
470                        // Capture the original length before consuming events
471                        let original_len = dcb_sequenced_events.len();
472
473                        // Filter and map events, discarding those with position > captured_head
474                        let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
475                            dcb_sequenced_events
476                                .into_iter()
477                                .filter(|e| {
478                                    if let Some(h) = captured_head {
479                                        e.position <= h
480                                    } else {
481                                        true
482                                    }
483                                })
484                                .map(umadb_proto::v1::SequencedEvent::from)
485                                .collect();
486
487                        let reached_captured_head = if captured_head.is_some() {
488                            // Check if we filtered out any events
489                            sequenced_event_protos.len() < original_len
490                        } else {
491                            false
492                        };
493
494                        // Calculate head to send based on context
495                        // For subscriptions: use current head
496                        // For unlimited non-subscription reads: use captured_head
497                        // For limited reads: use last event position (or current head if empty)
498                        let last_event_position = sequenced_event_protos.last().map(|e| e.position);
499                        let head_to_send = if subscribe {
500                            head
501                        } else if limit.is_none() {
502                            captured_head
503                        } else {
504                            last_event_position.or(head)
505                        };
506
507                        if sequenced_event_protos.is_empty() {
508                            // Only send an empty response to communicate head if this is the first
509                            if !sent_any {
510                                let response = umadb_proto::v1::ReadResponse {
511                                    events: vec![],
512                                    head: head_to_send,
513                                };
514                                let _ = tx.send(Ok(response)).await;
515                            }
516                            // For subscriptions, wait for new events instead of terminating
517                            if subscribe {
518                                // Wait for either a new head or a server shutdown signal
519                                tokio::select! {
520                                    _ = head_rx.changed() => {},
521                                    _ = shutdown_watch_rx.changed() => {},
522                                    _ = tx.closed() => {},
523                                }
524                                continue;
525                            }
526                            break;
527                        }
528
529                        // Capture values needed after sequenced_event_protos is moved
530                        let sent_count = sequenced_event_protos.len() as u32;
531
532                        let response = umadb_proto::v1::ReadResponse {
533                            events: sequenced_event_protos,
534                            head: head_to_send,
535                        };
536
537                        if tx.send(Ok(response)).await.is_err() {
538                            break;
539                        }
540                        sent_any = true;
541
542                        // Advance the cursor (use a new reader on the next loop iteration)
543                        next_start =
544                            last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
545
546                        // Stop streaming further if we reached the
547                        // captured head boundary (non-subscriber only).
548                        if reached_captured_head && !subscribe {
549                            break;
550                        }
551
552                        // Decrease the remaining overall limit if any, and stop if reached
553                        if limit.is_some() {
554                            if remaining_limit <= sent_count {
555                                remaining_limit = 0;
556                            } else {
557                                remaining_limit -= sent_count;
558                            }
559                            if remaining_limit == 0 {
560                                break;
561                            }
562                        }
563
564                        // Yield to let other tasks progress under high concurrency
565                        tokio::task::yield_now().await;
566                    }
567                    Err(e) => {
568                        let _ = tx.send(Err(status_from_dcb_error(e))).await;
569                        break;
570                    }
571                }
572            }
573        });
574
575        // Return the receiver as a stream
576        Ok(Response::new(
577            Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
578        ))
579    }
580
581    async fn append(
582        &self,
583        request: Request<umadb_proto::v1::AppendRequest>,
584    ) -> Result<Response<umadb_proto::v1::AppendResponse>, Status> {
585        // Enforce API key if configured
586        if let Some(expected) = &self.api_key {
587            let auth = request.metadata().get("authorization");
588            let expected_val = format!("Bearer {}", expected);
589            let ok = auth
590                .and_then(|m| m.to_str().ok())
591                .map(|s| s == expected_val)
592                .unwrap_or(false);
593            if !ok {
594                return Err(status_from_dcb_error(DCBError::AuthenticationError(
595                    "missing or invalid API key".to_string(),
596                )));
597            }
598        }
599        let req = request.into_inner();
600
601        // Convert protobuf types to API types
602        let events: Vec<DCBEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
603            Ok(events) => events,
604            Err(e) => {
605                return Err(status_from_dcb_error(e));
606            }
607        };
608        let condition = req.condition.map(|c| c.into());
609
610        // Call the event store append method
611        match self
612            .request_handler
613            .append(
614                events,
615                condition,
616                req.tracking_info.map(|t| TrackingInfo {
617                    source: t.source,
618                    position: t.position,
619                }),
620            )
621            .await
622        {
623            Ok(position) => Ok(Response::new(umadb_proto::v1::AppendResponse { position })),
624            Err(e) => Err(status_from_dcb_error(e)),
625        }
626    }
627
628    async fn head(
629        &self,
630        request: Request<umadb_proto::v1::HeadRequest>,
631    ) -> Result<Response<umadb_proto::v1::HeadResponse>, Status> {
632        // Enforce API key if configured
633        if let Some(expected) = &self.api_key {
634            let auth = request.metadata().get("authorization");
635            let expected_val = format!("Bearer {}", expected);
636            let ok = auth
637                .and_then(|m| m.to_str().ok())
638                .map(|s| s == expected_val)
639                .unwrap_or(false);
640            if !ok {
641                return Err(status_from_dcb_error(DCBError::AuthenticationError(
642                    "missing or invalid API key".to_string(),
643                )));
644            }
645        }
646        // Call the event store head method
647        match self.request_handler.head().await {
648            Ok(position) => {
649                // Return the position as a response
650                Ok(Response::new(umadb_proto::v1::HeadResponse { position }))
651            }
652            Err(e) => Err(status_from_dcb_error(e)),
653        }
654    }
655
656    async fn get_tracking_info(
657        &self,
658        request: Request<umadb_proto::v1::TrackingRequest>,
659    ) -> Result<Response<umadb_proto::v1::TrackingResponse>, Status> {
660        // Enforce API key if configured
661        if let Some(expected) = &self.api_key {
662            let auth = request.metadata().get("authorization");
663            let expected_val = format!("Bearer {}", expected);
664            let ok = auth
665                .and_then(|m| m.to_str().ok())
666                .map(|s| s == expected_val)
667                .unwrap_or(false);
668            if !ok {
669                return Err(status_from_dcb_error(DCBError::AuthenticationError(
670                    "missing or invalid API key".to_string(),
671                )));
672            }
673        }
674        let req = request.into_inner();
675        match self.request_handler.get_tracking_info(req.source).await {
676            Ok(position) => Ok(Response::new(umadb_proto::v1::TrackingResponse {
677                position,
678            })),
679            Err(e) => Err(status_from_dcb_error(e)),
680        }
681    }
682}
683
684// Message types for communication between the gRPC server and the request handler's writer thread
685enum WriterRequest {
686    Append {
687        events: Vec<DCBEvent>,
688        condition: Option<DCBAppendCondition>,
689        tracking_info: Option<TrackingInfo>,
690        response_tx: oneshot::Sender<DCBResult<u64>>,
691    },
692    Shutdown,
693}
694
695// Thread-safe request handler
696struct RequestHandler {
697    mvcc: Arc<Mvcc>,
698    head_watch_tx: watch::Sender<Option<u64>>,
699    writer_request_tx: mpsc::Sender<WriterRequest>,
700}
701
702impl RequestHandler {
703    fn new<P: AsRef<Path> + Send + 'static>(path: P) -> DCBResult<Self> {
704        // Create a channel for sending requests to the writer thread
705        let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
706
707        // Build a shared Mvcc instance (Arc) upfront so reads can proceed concurrently
708        let p = path.as_ref();
709        let file_path = if p.is_dir() {
710            p.join(DEFAULT_DB_FILENAME)
711        } else {
712            p.to_path_buf()
713        };
714        let mvcc = Arc::new(Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)?);
715
716        // Initialize the head watch channel with the current head.
717        let init_head = {
718            let (_, header) = mvcc.get_latest_header()?;
719            let last = header.next_position.0.saturating_sub(1);
720            if last == 0 { None } else { Some(last) }
721        };
722        let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
723
724        // Spawn a thread for processing writer requests.
725        let mvcc_for_writer = mvcc.clone();
726        let head_tx_writer = head_tx.clone();
727        thread::spawn(move || {
728            let db = UmaDB::from_arc(mvcc_for_writer);
729
730            // Create a runtime for processing writer requests.
731            let rt = Runtime::new().unwrap();
732
733            // Process writer requests.
734            rt.block_on(async {
735                while let Some(request) = request_rx.recv().await {
736                    match request {
737                        WriterRequest::Append {
738                            events,
739                            condition,
740                            tracking_info,
741                            response_tx,
742                        } => {
743                            // Batch processing: drain any immediately available requests
744                            // let mut items: Vec<(Vec<DCBEvent>, Option<DCBAppendCondition>)> =
745                            //     Vec::new();
746
747                            let mut total_events = 0;
748                            total_events += events.len();
749                            // items.push((events, condition));
750
751                            let mvcc = &db.mvcc;
752                            let mut writer = match mvcc.writer() {
753                                Ok(writer) => writer,
754                                Err(err) => {
755                                    let _ = response_tx.send(Err(err));
756                                    continue;
757                                }
758                            };
759
760                            let mut responders: Vec<oneshot::Sender<DCBResult<u64>>> = Vec::new();
761                            let mut results: Vec<DCBResult<u64>> = Vec::new();
762
763                            // Track abort state for non-integrity error within the batch
764                            let mut abort_idx: Option<usize> = None;
765                            let mut abort_err: Option<DCBError> = None;
766
767                            responders.push(response_tx);
768                            let result = UmaDB::process_append_request(
769                                events,
770                                condition,
771                                tracking_info,
772                                mvcc,
773                                &mut writer,
774                            );
775                            // Record result and possibly mark abort
776                            match &result {
777                                Ok(_) => results.push(result),
778                                Err(e) if is_integrity_error(e) => {
779                                    results.push(Err(clone_dcb_error(e)))
780                                }
781                                Err(e) => {
782                                    abort_idx = Some(0);
783                                    abort_err = Some(clone_dcb_error(e));
784                                    results.push(Err(clone_dcb_error(e)));
785                                }
786                            }
787
788                            // Drain the channel for more pending writer requests without awaiting.
789                            // Important: do not drop a popped request when hitting the batch limit.
790                            // We stop draining BEFORE attempting to recv if we've reached the limit.
791                            loop {
792                                if total_events >= APPEND_BATCH_MAX_EVENTS {
793                                    break;
794                                }
795                                // Stop draining if we've already decided to abort
796                                if abort_idx.is_some() {
797                                    break;
798                                }
799                                match request_rx.try_recv() {
800                                    Ok(WriterRequest::Append {
801                                        events,
802                                        condition,
803                                        tracking_info,
804                                        response_tx,
805                                    }) => {
806                                        let ev_len = events.len();
807                                        let idx_in_batch = responders.len();
808                                        responders.push(response_tx);
809                                        let res_next = UmaDB::process_append_request(
810                                            events,
811                                            condition,
812                                            tracking_info,
813                                            mvcc,
814                                            &mut writer,
815                                        );
816                                        match &res_next {
817                                            Ok(_) => results.push(res_next),
818                                            Err(e) if is_integrity_error(e) => {
819                                                results.push(Err(clone_dcb_error(e)))
820                                            }
821                                            Err(e) => {
822                                                abort_idx = Some(idx_in_batch);
823                                                abort_err = Some(clone_dcb_error(e));
824                                                results.push(Err(clone_dcb_error(e)));
825                                                // Do not accumulate more into the batch
826                                            }
827                                        }
828                                        total_events += ev_len;
829                                    }
830                                    Ok(WriterRequest::Shutdown) => {
831                                        // Push back the shutdown signal by breaking and letting
832                                        // outer loop handle after batch. We'll process the
833                                        // current batch first, then break the outer loop on
834                                        // the next iteration when the channel is empty.
835                                        break;
836                                    }
837                                    Err(mpsc::error::TryRecvError::Empty) => {
838                                        break;
839                                    }
840                                    Err(mpsc::error::TryRecvError::Disconnected) => break,
841                                }
842                            }
843                            // println!("Total events: {total_events}");
844
845                            if let (Some(failed_at), Some(orig_err)) = (abort_idx, abort_err) {
846                                // Abort batch: skip commit; respond to all items in this batch
847                                let shadow = shadow_for_batch_abort(&orig_err);
848                                for (i, tx) in responders.into_iter().enumerate() {
849                                    if i == failed_at {
850                                        let _ = tx.send(Err(clone_dcb_error(&orig_err)));
851                                    } else {
852                                        let _ = tx.send(Err(clone_dcb_error(&shadow)));
853                                    }
854                                }
855                                // Do not update head, since nothing was committed
856                                continue;
857                            }
858
859                            // Single commit at the end of the batch
860                            let batch_result = match mvcc.commit(&mut writer) {
861                                Ok(_) => Ok(results),
862                                Err(err) => Err(err),
863                            };
864
865                            match batch_result {
866                                Ok(results) => {
867                                    // Send individual results back to requesters
868                                    for (res, tx) in results.into_iter().zip(responders.into_iter())
869                                    {
870                                        let _ = tx.send(res);
871                                    }
872                                    // After a successful batch commit, publish the updated head from writer.next_position.
873                                    let last_committed = writer.next_position.0.saturating_sub(1);
874                                    let new_head = if last_committed == 0 {
875                                        None
876                                    } else {
877                                        Some(last_committed)
878                                    };
879                                    let _ = head_tx_writer.send(new_head);
880                                }
881                                Err(e) => {
882                                    // If the batch failed as a whole (e.g., commit failed), propagate the SAME error to all responders.
883                                    // DCBError is not Clone (contains io::Error), so reconstruct a best-effort copy by using its Display text
884                                    // for Io and cloning data for other variants.
885                                    let total = responders.len();
886                                    let mut iter = responders.into_iter();
887                                    for _ in 0..total {
888                                        if let Some(tx) = iter.next() {
889                                            let _ = tx.send(Err(clone_dcb_error(&e)));
890                                        }
891                                    }
892                                }
893                            }
894                        }
895                        WriterRequest::Shutdown => {
896                            break;
897                        }
898                    }
899                }
900            });
901        });
902
903        Ok(Self {
904            mvcc,
905            head_watch_tx: head_tx,
906            writer_request_tx: request_tx,
907        })
908    }
909
910    async fn read(
911        &self,
912        query: Option<DCBQuery>,
913        start: Option<u64>,
914        backwards: bool,
915        limit: Option<u32>,
916    ) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
917        let reader = self.mvcc.reader()?;
918        let last_committed_position = reader.next_position.0.saturating_sub(1);
919
920        let q = query.unwrap_or(DCBQuery { items: vec![] });
921        let start_position = start.map(Position);
922
923        let events = read_conditional(
924            &self.mvcc,
925            &std::collections::HashMap::new(),
926            reader.events_tree_root_id,
927            reader.tags_tree_root_id,
928            q,
929            start_position,
930            backwards,
931            limit,
932            false,
933        )
934        .map_err(|e| DCBError::Corruption(format!("{e}")))?;
935
936        let head = if limit.is_none() {
937            if last_committed_position == 0 {
938                None
939            } else {
940                Some(last_committed_position)
941            }
942        } else {
943            events.last().map(|e| e.position)
944        };
945
946        Ok((events, head))
947    }
948
949    async fn head(&self) -> DCBResult<Option<u64>> {
950        let (_, header) = self
951            .mvcc
952            .get_latest_header()
953            .map_err(|e| DCBError::Corruption(format!("{e}")))?;
954        let last = header.next_position.0.saturating_sub(1);
955        if last == 0 { Ok(None) } else { Ok(Some(last)) }
956    }
957
958    async fn get_tracking_info(&self, source: String) -> DCBResult<Option<u64>> {
959        let db = UmaDB::from_arc(self.mvcc.clone());
960        db.get_tracking_info(&source)
961    }
962
963    pub async fn append(
964        &self,
965        events: Vec<DCBEvent>,
966        condition: Option<DCBAppendCondition>,
967        tracking_info: Option<TrackingInfo>,
968    ) -> DCBResult<u64> {
969        // Concurrent pre-check of the given condition using a reader in a blocking thread.
970        let pre_append_decision = if let Some(mut given_condition) = condition {
971            let reader = self.mvcc.reader()?;
972            let current_head = {
973                let last = reader.next_position.0.saturating_sub(1);
974                if last == 0 { None } else { Some(last) }
975            };
976
977            // Perform conditional read on the snapshot (limit 1) starting after the given position
978            let from = given_condition.after.map(|after| Position(after + 1));
979            let empty_dirty = std::collections::HashMap::new();
980            let found = read_conditional(
981                &self.mvcc,
982                &empty_dirty,
983                reader.events_tree_root_id,
984                reader.tags_tree_root_id,
985                given_condition.fail_if_events_match.clone(),
986                from,
987                false,
988                Some(1),
989                false,
990            )?;
991
992            if let Some(matched) = found.first() {
993                // Found one event — consider if the request is idempotent...
994                match is_request_idempotent(
995                    &self.mvcc,
996                    &empty_dirty,
997                    reader.events_tree_root_id,
998                    reader.tags_tree_root_id,
999                    &events,
1000                    given_condition.fail_if_events_match.clone(),
1001                    from,
1002                ) {
1003                    Ok(Some(last_recorded_position)) => {
1004                        // Request is idempotent; skip actual append
1005                        PreAppendDecision::AlreadyAppended(last_recorded_position)
1006                    }
1007                    Ok(None) => {
1008                        // Integrity violation
1009                        let msg = format!(
1010                            "condition: {:?} matched: {:?}",
1011                            given_condition.clone(),
1012                            matched,
1013                        );
1014                        return Err(DCBError::IntegrityError(msg));
1015                    }
1016                    Err(err) => {
1017                        // Propagate underlying read error
1018                        return Err(err);
1019                    }
1020                }
1021            } else {
1022                // No match found: we can advance 'after' to the current head observed by this reader
1023                let new_after = std::cmp::max(
1024                    given_condition.after.unwrap_or(0),
1025                    current_head.unwrap_or(0),
1026                );
1027                given_condition.after = Some(new_after);
1028
1029                PreAppendDecision::UseCondition(Some(given_condition))
1030            }
1031        } else {
1032            // No condition provided at all
1033            PreAppendDecision::UseCondition(None)
1034        };
1035
1036        // Handle the pre-check decision
1037        match pre_append_decision {
1038            PreAppendDecision::AlreadyAppended(last_found_position) => {
1039                // ✅ Request was idempotent — just return the existing position.
1040                Ok(last_found_position)
1041            }
1042            PreAppendDecision::UseCondition(adjusted_condition) => {
1043                // ✅ Proceed with append operation on the writer thread.
1044                let (response_tx, response_rx) = oneshot::channel();
1045
1046                self.writer_request_tx
1047                    .send(WriterRequest::Append {
1048                        events,
1049                        condition: adjusted_condition,
1050                        tracking_info,
1051                        response_tx,
1052                    })
1053                    .await
1054                    .map_err(|_| {
1055                        DCBError::Io(std::io::Error::other(
1056                            "Failed to send append request to EventStore thread",
1057                        ))
1058                    })?;
1059
1060                response_rx.await.map_err(|_| {
1061                    DCBError::Io(std::io::Error::other(
1062                        "Failed to receive append response from EventStore thread",
1063                    ))
1064                })?
1065            }
1066        }
1067    }
1068
1069    fn watch_head(&self) -> watch::Receiver<Option<u64>> {
1070        self.head_watch_tx.subscribe()
1071    }
1072
1073    #[allow(dead_code)]
1074    async fn shutdown(&self) {
1075        let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
1076    }
1077}
1078
1079// Clone implementation for EventStoreHandle
1080impl Clone for RequestHandler {
1081    fn clone(&self) -> Self {
1082        Self {
1083            mvcc: self.mvcc.clone(),
1084            head_watch_tx: self.head_watch_tx.clone(),
1085            writer_request_tx: self.writer_request_tx.clone(),
1086        }
1087    }
1088}
1089
1090#[derive(Debug)]
1091enum PreAppendDecision {
1092    /// Proceed with this (possibly adjusted) condition
1093    UseCondition(Option<DCBAppendCondition>),
1094    /// Skip append operation because the request was idempotent; return last recorded position
1095    AlreadyAppended(u64),
1096}