umadb_server/
lib.rs

1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::LazyLock;
7use std::thread;
8use std::time::Instant;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::{Identity, ServerTlsConfig};
12use tonic::{Request, Response, Status, transport::Server};
13use umadb_core::db::{
14    DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDB, is_request_idempotent, read_conditional,
15};
16use umadb_core::mvcc::Mvcc;
17use umadb_dcb::{
18    DCBAppendCondition, DCBError, DCBEvent, DCBQuery, DCBResult, DCBSequencedEvent, TrackingInfo,
19};
20
21use tokio::runtime::Runtime;
22use tonic::codegen::http;
23use tonic::transport::server::TcpIncoming;
24use umadb_core::common::Position;
25
26use std::convert::Infallible;
27use std::future::Future;
28use std::task::{Context, Poll};
29use tonic::server::NamedService;
30use umadb_proto::status_from_dcb_error;
31
32// This is just to maintain compatibility for the very early unversioned API (pre-v1).
33#[derive(Clone, Debug)]
34pub struct PathRewriterService<S> {
35    inner: S,
36}
37
38impl<S> tower::Service<http::Request<tonic::body::Body>> for PathRewriterService<S>
39where
40    S: tower::Service<
41            http::Request<tonic::body::Body>,
42            Response = http::Response<tonic::body::Body>,
43            Error = Infallible,
44        > + Clone
45        + Send
46        + 'static,
47    S::Future: Send + 'static,
48{
49    type Response = S::Response;
50    type Error = S::Error;
51    type Future =
52        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
53
54    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55        self.inner.poll_ready(cx)
56    }
57
58    fn call(&mut self, mut req: http::Request<tonic::body::Body>) -> Self::Future {
59        let uri = req.uri().clone();
60        let path = uri.path();
61
62        // Check and rewrite the path string first
63        if path.starts_with("/umadb.UmaDBService/") {
64            let new_path_str = path.replace("/umadb.UmaDBService/", "/umadb.v1.DCB/");
65
66            // Use the existing authority and scheme if present, otherwise default to a simple path-only URI structure
67            // which is often safer than hardcoded hostnames in internal systems.
68            let new_uri = if let (Some(scheme), Some(authority)) = (uri.scheme(), uri.authority()) {
69                // If we have all components, try to build the full URI
70                http::Uri::builder()
71                    .scheme(scheme.clone())
72                    .authority(authority.clone())
73                    .path_and_query(new_path_str.as_str())
74                    .build()
75                    .ok() // Convert the final build Result into an Option
76            } else {
77                // Fallback for malformed requests (missing scheme/authority)
78                // Just try to build a path-only URI
79                new_path_str.parse::<http::Uri>().ok()
80            };
81
82            if let Some(final_uri) = new_uri {
83                *req.uri_mut() = final_uri;
84            } else {
85                eprintln!("Failed to construct valid URI for path: {}", path);
86            }
87        }
88
89        let fut = self.inner.call(req);
90        Box::pin(fut)
91    }
92}
93
94// Add this implementation to satisfy the compiler error
95impl<S: NamedService> NamedService for PathRewriterService<S> {
96    const NAME: &'static str = S::NAME;
97}
98
99#[derive(Clone, Debug)]
100pub struct PathRewriterLayer;
101
102impl<S> tower::Layer<S> for PathRewriterLayer
103where
104    S: tower::Service<
105            http::Request<tonic::body::Body>,
106            Response = http::Response<tonic::body::Body>,
107            Error = Infallible,
108        > + Clone
109        + Send
110        + 'static,
111    S::Future: Send + 'static,
112{
113    type Service = PathRewriterService<S>;
114
115    fn layer(&self, inner: S) -> Self::Service {
116        PathRewriterService { inner }
117    }
118}
119
120static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
121
122const APPEND_BATCH_MAX_EVENTS: usize = 2000;
123const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
124const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
125
126// Optional TLS configuration helpers
127#[derive(Clone, Debug)]
128pub struct ServerTlsOptions {
129    pub cert_pem: Vec<u8>,
130    pub key_pem: Vec<u8>,
131}
132
133pub fn uptime() -> std::time::Duration {
134    START_TIME.elapsed()
135}
136
137fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
138    use std::time::Duration;
139    let mut server_builder = Server::builder()
140        .http2_keepalive_interval(Some(Duration::from_secs(5)))
141        .http2_keepalive_timeout(Some(Duration::from_secs(10)))
142        .initial_stream_window_size(Some(4 * 1024 * 1024))
143        .initial_connection_window_size(Some(8 * 1024 * 1024))
144        .tcp_nodelay(true)
145        .concurrency_limit_per_connection(1024);
146
147    if let Some(opts) = tls {
148        let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
149        server_builder = server_builder
150            .tls_config(ServerTlsConfig::new().identity(identity))
151            .expect("failed to apply TLS config");
152    }
153
154    server_builder
155}
156
157// Function to start the gRPC server with a shutdown signal
158pub async fn start_server<P: AsRef<Path> + Send + 'static>(
159    path: P,
160    addr: &str,
161    shutdown_rx: oneshot::Receiver<()>,
162) -> Result<(), Box<dyn std::error::Error>> {
163    start_server_internal(path, addr, shutdown_rx, None, None).await
164}
165
166/// Start server with TLS using PEM-encoded cert and key.
167pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
168    path: P,
169    addr: &str,
170    shutdown_rx: oneshot::Receiver<()>,
171    cert_pem: Vec<u8>,
172    key_pem: Vec<u8>,
173) -> Result<(), Box<dyn std::error::Error>> {
174    let tls = ServerTlsOptions { cert_pem, key_pem };
175    start_server_internal(path, addr, shutdown_rx, Some(tls), None).await
176}
177
178/// Convenience: load cert and key from filesystem paths
179pub async fn start_server_secure_from_files<
180    P: AsRef<Path> + Send + 'static,
181    CP: AsRef<Path>,
182    KP: AsRef<Path>,
183>(
184    path: P,
185    addr: &str,
186    shutdown_rx: oneshot::Receiver<()>,
187    cert_path: CP,
188    key_path: KP,
189) -> Result<(), Box<dyn std::error::Error>> {
190    let cert_path_ref = cert_path.as_ref();
191    let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
192        format!(
193            "Failed to open TLS certificate file '{}': {}",
194            cert_path_ref.display(),
195            e
196        )
197        .into()
198    })?;
199
200    let key_path_ref = key_path.as_ref();
201    let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
202        format!(
203            "Failed to open TLS key file '{}': {}",
204            key_path_ref.display(),
205            e
206        )
207        .into()
208    })?;
209    start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
210}
211
212/// Start server (insecure) requiring an API key for all RPCs.
213pub async fn start_server_with_api_key<P: AsRef<Path> + Send + 'static>(
214    path: P,
215    addr: &str,
216    shutdown_rx: oneshot::Receiver<()>,
217    api_key: String,
218) -> Result<(), Box<dyn std::error::Error>> {
219    start_server_internal(path, addr, shutdown_rx, None, Some(api_key)).await
220}
221
222/// Start TLS server requiring an API key for all RPCs.
223pub async fn start_server_secure_with_api_key<P: AsRef<Path> + Send + 'static>(
224    path: P,
225    addr: &str,
226    shutdown_rx: oneshot::Receiver<()>,
227    cert_pem: Vec<u8>,
228    key_pem: Vec<u8>,
229    api_key: String,
230) -> Result<(), Box<dyn std::error::Error>> {
231    let tls = ServerTlsOptions { cert_pem, key_pem };
232    start_server_internal(path, addr, shutdown_rx, Some(tls), Some(api_key)).await
233}
234
235/// TLS server from files requiring an API key
236pub async fn start_server_secure_from_files_with_api_key<
237    P: AsRef<Path> + Send + 'static,
238    CP: AsRef<Path>,
239    KP: AsRef<Path>,
240>(
241    path: P,
242    addr: &str,
243    shutdown_rx: oneshot::Receiver<()>,
244    cert_path: CP,
245    key_path: KP,
246    api_key: String,
247) -> Result<(), Box<dyn std::error::Error>> {
248    let cert_path_ref = cert_path.as_ref();
249    let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
250        format!(
251            "Failed to open TLS certificate file '{}': {}",
252            cert_path_ref.display(),
253            e
254        )
255        .into()
256    })?;
257
258    let key_path_ref = key_path.as_ref();
259    let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
260        format!(
261            "Failed to open TLS key file '{}': {}",
262            key_path_ref.display(),
263            e
264        )
265        .into()
266    })?;
267    start_server_secure_with_api_key(path, addr, shutdown_rx, cert_pem, key_pem, api_key).await
268}
269
270async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
271    path: P,
272    addr: &str,
273    shutdown_rx: oneshot::Receiver<()>,
274    tls: Option<ServerTlsOptions>,
275    api_key: Option<String>,
276) -> Result<(), Box<dyn std::error::Error>> {
277    let addr = addr.parse()?;
278    // ---- Bind incoming manually like tonic ----
279    let incoming = match TcpIncoming::bind(addr) {
280        Ok(incoming) => incoming,
281        Err(err) => {
282            return Err(Box::new(DCBError::InitializationError(format!(
283                "Failed to bind to address {}: {}",
284                addr, err
285            ))));
286        }
287    }
288    .with_nodelay(Some(true))
289    .with_keepalive(Some(std::time::Duration::from_secs(60)));
290
291    // Create a shutdown broadcast channel for terminating ongoing subscriptions
292    let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
293    let dcb_server =
294        match DCBServer::new(path.as_ref().to_owned(), srv_shutdown_rx, api_key.clone()) {
295            Ok(server) => server,
296            Err(err) => {
297                return Err(Box::new(err));
298            }
299        };
300
301    println!(
302        "UmaDB has {:?} events",
303        dcb_server.request_handler.head().await?.unwrap_or(0)
304    );
305    let tls_mode_display_str = if tls.is_some() {
306        "with TLS"
307    } else {
308        "without TLS"
309    };
310
311    let api_key_display_str = if api_key.is_some() {
312        "with API key"
313    } else {
314        "without API key"
315    };
316
317    // gRPC Health service setup
318    use tonic_health::ServingStatus; // server API expects this enum
319    let (health_reporter, health_service) = tonic_health::server::health_reporter();
320    // Set overall and service-specific health to SERVING
321    health_reporter
322        .set_service_status("", ServingStatus::Serving)
323        .await;
324    health_reporter
325        .set_service_status("umadb.v1.DCB", ServingStatus::Serving)
326        .await;
327    let health_reporter_for_shutdown = health_reporter.clone();
328
329    // Apply PathRewriterLayer at the server level to intercept all requests before routing
330    let mut builder = build_server_builder_with_options(tls)
331        .layer(PathRewriterLayer)
332        .add_service(health_service);
333
334    // Add DCB service (auth enforced inside RPC handlers if configured)
335    builder = builder.add_service(dcb_server.into_service());
336    let router = builder;
337
338    println!("UmaDB is listening on {addr} ({tls_mode_display_str}, {api_key_display_str})");
339    println!("UmaDB started in {:?}", uptime());
340    // let incoming = router.server.bind_incoming();
341    router
342        .serve_with_incoming_shutdown(incoming, async move {
343            // Wait for an external shutdown trigger
344            let _ = shutdown_rx.await;
345            // Mark health as NOT_SERVING before shutdown
346            let _ = health_reporter_for_shutdown
347                .set_service_status("", ServingStatus::NotServing)
348                .await;
349            let _ = health_reporter_for_shutdown
350                .set_service_status("umadb.v1.DCB", ServingStatus::NotServing)
351                .await;
352            // Broadcast shutdown to all subscription tasks
353            let _ = srv_shutdown_tx.send(true);
354            println!("UmaDB server shutdown complete");
355        })
356        .await?;
357
358    Ok(())
359}
360
361// gRPC server implementation
362pub struct DCBServer {
363    pub(crate) request_handler: RequestHandler,
364    shutdown_watch_rx: watch::Receiver<bool>,
365    api_key: Option<String>,
366}
367
368impl DCBServer {
369    pub fn new<P: AsRef<Path> + Send + 'static>(
370        path: P,
371        shutdown_rx: watch::Receiver<bool>,
372        api_key: Option<String>,
373    ) -> DCBResult<Self> {
374        let command_handler = RequestHandler::new(path)?;
375        Ok(Self {
376            request_handler: command_handler,
377            shutdown_watch_rx: shutdown_rx,
378            api_key,
379        })
380    }
381
382    pub fn into_service(self) -> umadb_proto::v1::dcb_server::DcbServer<Self> {
383        umadb_proto::v1::dcb_server::DcbServer::new(self)
384    }
385}
386
387#[tonic::async_trait]
388impl umadb_proto::v1::dcb_server::Dcb for DCBServer {
389    type ReadStream =
390        Pin<Box<dyn Stream<Item = Result<umadb_proto::v1::ReadResponse, Status>> + Send + 'static>>;
391
392    async fn read(
393        &self,
394        request: Request<umadb_proto::v1::ReadRequest>,
395    ) -> Result<Response<Self::ReadStream>, Status> {
396        // Enforce API key if configured
397        if let Some(expected) = &self.api_key {
398            let auth = request.metadata().get("authorization");
399            let expected_val = format!("Bearer {}", expected);
400            let ok = auth
401                .and_then(|m| m.to_str().ok())
402                .map(|s| s == expected_val)
403                .unwrap_or(false);
404            if !ok {
405                return Err(status_from_dcb_error(DCBError::AuthenticationError(
406                    "missing or invalid API key".to_string(),
407                )));
408            }
409        }
410        let read_request = request.into_inner();
411
412        // Convert protobuf query to DCB types
413        let mut query: Option<DCBQuery> = read_request.query.map(|q| q.into());
414        let start = read_request.start;
415        let backwards = read_request.backwards.unwrap_or(false);
416        let limit = read_request.limit;
417        // Cap requested batch size.
418        let batch_size = read_request
419            .batch_size
420            .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
421            .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
422        let subscribe = read_request.subscribe.unwrap_or(false);
423
424        // Create a channel for streaming responses (deeper buffer to reduce backpressure under concurrency)
425        let (tx, rx) = mpsc::channel(2048);
426        // Clone the request handler.
427        let request_handler = self.request_handler.clone();
428        // Clone the shutdown watch receiver.
429        let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
430
431        // Spawn a task to handle the read operation and stream multiple batches
432        tokio::spawn(async move {
433            // Ensure we can reuse the same query across batches
434            let query_clone = query.take();
435            let mut next_start = start;
436            let mut sent_any = false;
437            let mut remaining_limit = limit.unwrap_or(u32::MAX);
438            // Create a watch receiver for head updates (for subscriptions)
439            // TODO: Make this an Option and only do this for subscriptions?
440            let mut head_rx = request_handler.watch_head();
441            // If non-subscription read, capture head to preserve point-in-time semantics
442            let captured_head = if !subscribe {
443                request_handler.head().await.unwrap_or(None)
444            } else {
445                None
446            };
447            loop {
448                // If this is a subscription, exit if the client
449                // has gone away or the server is shutting down.
450                if subscribe {
451                    if tx.is_closed() {
452                        break;
453                    }
454                    if *shutdown_watch_rx.borrow() {
455                        break;
456                    }
457                }
458                // Determine per-iteration limit.
459                let read_limit = remaining_limit.min(batch_size);
460                // If subscription and remaining exhausted (limit reached), terminate
461                if subscribe && limit.is_some() && remaining_limit == 0 {
462                    break;
463                }
464                match request_handler
465                    .read(query_clone.clone(), next_start, backwards, Some(read_limit))
466                    .await
467                {
468                    Ok((dcb_sequenced_events, head)) => {
469                        // Capture the original length before consuming events
470                        let original_len = dcb_sequenced_events.len();
471
472                        // Filter and map events, discarding those with position > captured_head
473                        let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
474                            dcb_sequenced_events
475                                .into_iter()
476                                .filter(|e| {
477                                    if let Some(h) = captured_head {
478                                        e.position <= h
479                                    } else {
480                                        true
481                                    }
482                                })
483                                .map(umadb_proto::v1::SequencedEvent::from)
484                                .collect();
485
486                        let reached_captured_head = if captured_head.is_some() {
487                            // Check if we filtered out any events
488                            sequenced_event_protos.len() < original_len
489                        } else {
490                            false
491                        };
492
493                        // Calculate head to send based on context
494                        // For subscriptions: use current head
495                        // For unlimited non-subscription reads: use captured_head
496                        // For limited reads: use last event position (or current head if empty)
497                        let last_event_position = sequenced_event_protos.last().map(|e| e.position);
498                        let head_to_send = if subscribe {
499                            head
500                        } else if limit.is_none() {
501                            captured_head
502                        } else {
503                            last_event_position.or(head)
504                        };
505
506                        if sequenced_event_protos.is_empty() {
507                            // Only send an empty response to communicate head if this is the first
508                            if !sent_any {
509                                let response = umadb_proto::v1::ReadResponse {
510                                    events: vec![],
511                                    head: head_to_send,
512                                };
513                                let _ = tx.send(Ok(response)).await;
514                            }
515                            // For subscriptions, wait for new events instead of terminating
516                            if subscribe {
517                                // Wait for either a new head or a server shutdown signal
518                                tokio::select! {
519                                    _ = head_rx.changed() => {},
520                                    _ = shutdown_watch_rx.changed() => {},
521                                    _ = tx.closed() => {},
522                                }
523                                continue;
524                            }
525                            break;
526                        }
527
528                        // Capture values needed after sequenced_event_protos is moved
529                        let sent_count = sequenced_event_protos.len() as u32;
530
531                        let response = umadb_proto::v1::ReadResponse {
532                            events: sequenced_event_protos,
533                            head: head_to_send,
534                        };
535
536                        if tx.send(Ok(response)).await.is_err() {
537                            break;
538                        }
539                        sent_any = true;
540
541                        // Advance the cursor (use a new reader on the next loop iteration)
542                        next_start =
543                            last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
544
545                        // Stop streaming further if we reached the
546                        // captured head boundary (non-subscriber only).
547                        if reached_captured_head && !subscribe {
548                            break;
549                        }
550
551                        // Decrease the remaining overall limit if any, and stop if reached
552                        if limit.is_some() {
553                            if remaining_limit <= sent_count {
554                                remaining_limit = 0;
555                            } else {
556                                remaining_limit -= sent_count;
557                            }
558                            if remaining_limit == 0 {
559                                break;
560                            }
561                        }
562
563                        // Yield to let other tasks progress under high concurrency
564                        tokio::task::yield_now().await;
565                    }
566                    Err(e) => {
567                        let _ = tx.send(Err(status_from_dcb_error(e))).await;
568                        break;
569                    }
570                }
571            }
572        });
573
574        // Return the receiver as a stream
575        Ok(Response::new(
576            Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
577        ))
578    }
579
580    async fn append(
581        &self,
582        request: Request<umadb_proto::v1::AppendRequest>,
583    ) -> Result<Response<umadb_proto::v1::AppendResponse>, Status> {
584        // Enforce API key if configured
585        if let Some(expected) = &self.api_key {
586            let auth = request.metadata().get("authorization");
587            let expected_val = format!("Bearer {}", expected);
588            let ok = auth
589                .and_then(|m| m.to_str().ok())
590                .map(|s| s == expected_val)
591                .unwrap_or(false);
592            if !ok {
593                return Err(status_from_dcb_error(DCBError::AuthenticationError(
594                    "missing or invalid API key".to_string(),
595                )));
596            }
597        }
598        let req = request.into_inner();
599
600        // Convert protobuf types to API types
601        let events: Vec<DCBEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
602            Ok(events) => events,
603            Err(e) => {
604                return Err(status_from_dcb_error(e));
605            }
606        };
607        let condition = req.condition.map(|c| c.into());
608
609        // Call the event store append method
610        match self
611            .request_handler
612            .append(
613                events,
614                condition,
615                req.tracking_info.map(|t| TrackingInfo {
616                    source: t.source,
617                    position: t.position,
618                }),
619            )
620            .await
621        {
622            Ok(position) => Ok(Response::new(umadb_proto::v1::AppendResponse { position })),
623            Err(e) => Err(status_from_dcb_error(e)),
624        }
625    }
626
627    async fn head(
628        &self,
629        request: Request<umadb_proto::v1::HeadRequest>,
630    ) -> Result<Response<umadb_proto::v1::HeadResponse>, Status> {
631        // Enforce API key if configured
632        if let Some(expected) = &self.api_key {
633            let auth = request.metadata().get("authorization");
634            let expected_val = format!("Bearer {}", expected);
635            let ok = auth
636                .and_then(|m| m.to_str().ok())
637                .map(|s| s == expected_val)
638                .unwrap_or(false);
639            if !ok {
640                return Err(status_from_dcb_error(DCBError::AuthenticationError(
641                    "missing or invalid API key".to_string(),
642                )));
643            }
644        }
645        // Call the event store head method
646        match self.request_handler.head().await {
647            Ok(position) => {
648                // Return the position as a response
649                Ok(Response::new(umadb_proto::v1::HeadResponse { position }))
650            }
651            Err(e) => Err(status_from_dcb_error(e)),
652        }
653    }
654
655    async fn get_tracking_info(
656        &self,
657        request: Request<umadb_proto::v1::TrackingRequest>,
658    ) -> Result<Response<umadb_proto::v1::TrackingResponse>, Status> {
659        // Enforce API key if configured
660        if let Some(expected) = &self.api_key {
661            let auth = request.metadata().get("authorization");
662            let expected_val = format!("Bearer {}", expected);
663            let ok = auth
664                .and_then(|m| m.to_str().ok())
665                .map(|s| s == expected_val)
666                .unwrap_or(false);
667            if !ok {
668                return Err(status_from_dcb_error(DCBError::AuthenticationError(
669                    "missing or invalid API key".to_string(),
670                )));
671            }
672        }
673        let req = request.into_inner();
674        match self.request_handler.get_tracking_info(req.source).await {
675            Ok(position) => Ok(Response::new(umadb_proto::v1::TrackingResponse {
676                position,
677            })),
678            Err(e) => Err(status_from_dcb_error(e)),
679        }
680    }
681}
682
683// Message types for communication between the gRPC server and the request handler's writer thread
684enum WriterRequest {
685    Append {
686        events: Vec<DCBEvent>,
687        condition: Option<DCBAppendCondition>,
688        tracking_info: Option<TrackingInfo>,
689        response_tx: oneshot::Sender<DCBResult<u64>>,
690    },
691    Shutdown,
692}
693
694// Thread-safe request handler
695struct RequestHandler {
696    mvcc: Arc<Mvcc>,
697    head_watch_tx: watch::Sender<Option<u64>>,
698    writer_request_tx: mpsc::Sender<WriterRequest>,
699}
700
701impl RequestHandler {
702    fn new<P: AsRef<Path> + Send + 'static>(path: P) -> DCBResult<Self> {
703        // Create a channel for sending requests to the writer thread
704        let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
705
706        // Build a shared Mvcc instance (Arc) upfront so reads can proceed concurrently
707        let p = path.as_ref();
708        let file_path = if p.is_dir() {
709            p.join(DEFAULT_DB_FILENAME)
710        } else {
711            p.to_path_buf()
712        };
713        let mvcc = Arc::new(Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)?);
714
715        // Initialize the head watch channel with the current head.
716        let init_head = {
717            let (_, header) = mvcc.get_latest_header()?;
718            let last = header.next_position.0.saturating_sub(1);
719            if last == 0 { None } else { Some(last) }
720        };
721        let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
722
723        // Spawn a thread for processing writer requests.
724        let mvcc_for_writer = mvcc.clone();
725        let head_tx_writer = head_tx.clone();
726        thread::spawn(move || {
727            let db = UmaDB::from_arc(mvcc_for_writer);
728
729            // Create a runtime for processing writer requests.
730            let rt = Runtime::new().unwrap();
731
732            // Process writer requests.
733            rt.block_on(async {
734                while let Some(request) = request_rx.recv().await {
735                    match request {
736                        WriterRequest::Append {
737                            events,
738                            condition,
739                            tracking_info,
740                            response_tx,
741                        } => {
742                            // Batch processing: drain any immediately available requests
743                            // let mut items: Vec<(Vec<DCBEvent>, Option<DCBAppendCondition>)> =
744                            //     Vec::new();
745
746                            let mut total_events = 0;
747                            total_events += events.len();
748                            // items.push((events, condition));
749
750                            let mvcc = &db.mvcc;
751                            let mut writer = match mvcc.writer() {
752                                Ok(writer) => writer,
753                                Err(err) => {
754                                    let _ = response_tx.send(Err(err));
755                                    continue;
756                                }
757                            };
758
759                            let mut responders: Vec<oneshot::Sender<DCBResult<u64>>> = Vec::new();
760                            let mut results: Vec<DCBResult<u64>> = Vec::new();
761
762                            responders.push(response_tx);
763                            let mut result = UmaDB::process_append_request(
764                                events,
765                                condition,
766                                tracking_info,
767                                mvcc,
768                                &mut writer,
769                            );
770                            results.push(result);
771
772                            // Drain the channel for more pending writer requests without awaiting.
773                            // Important: do not drop a popped request when hitting the batch limit.
774                            // We stop draining BEFORE attempting to recv if we've reached the limit.
775                            loop {
776                                if total_events >= APPEND_BATCH_MAX_EVENTS {
777                                    break;
778                                }
779                                match request_rx.try_recv() {
780                                    Ok(WriterRequest::Append {
781                                        events,
782                                        condition,
783                                        tracking_info,
784                                        response_tx,
785                                    }) => {
786                                        let ev_len = events.len();
787                                        responders.push(response_tx);
788                                        result = UmaDB::process_append_request(
789                                            events,
790                                            condition,
791                                            tracking_info,
792                                            mvcc,
793                                            &mut writer,
794                                        );
795                                        results.push(result);
796                                        total_events += ev_len;
797                                    }
798                                    Ok(WriterRequest::Shutdown) => {
799                                        // Push back the shutdown signal by breaking and letting
800                                        // outer loop handle after batch. We'll process the
801                                        // current batch first, then break the outer loop on
802                                        // the next iteration when the channel is empty.
803                                        break;
804                                    }
805                                    Err(mpsc::error::TryRecvError::Empty) => {
806                                        break;
807                                    }
808                                    Err(mpsc::error::TryRecvError::Disconnected) => break,
809                                }
810                            }
811                            // println!("Total events: {total_events}");
812
813                            // Single commit at the end of the batch
814                            let batch_result = match mvcc.commit(&mut writer) {
815                                Ok(_) => Ok(results),
816                                Err(err) => Err(err),
817                            };
818
819                            match batch_result {
820                                Ok(results) => {
821                                    // Send individual results back to requesters
822                                    // Also compute the new head as the maximum successful last position in this batch
823                                    let mut max_ok: Option<u64> = None;
824                                    for (res, tx) in results.into_iter().zip(responders.into_iter())
825                                    {
826                                        if let Ok(v) = &res {
827                                            max_ok = Some(max_ok.map_or(*v, |m| m.max(*v)));
828                                        }
829                                        let _ = tx.send(res);
830                                    }
831                                    // After a successful batch commit, publish the updated head.
832                                    if let Some(h) = max_ok {
833                                        let _ = head_tx_writer.send(Some(h));
834                                    }
835                                }
836                                Err(e) => {
837                                    // If the batch failed as a whole (e.g., commit failed), propagate the SAME error to all responders.
838                                    // DCBError is not Clone (contains io::Error), so reconstruct a best-effort copy by using its Display text
839                                    // for Io and cloning data for other variants.
840                                    let total = responders.len();
841                                    let mut iter = responders.into_iter();
842                                    for _ in 0..total {
843                                        if let Some(tx) = iter.next() {
844                                            let _ = tx.send(Err(clone_dcb_error(&e)));
845                                        }
846                                    }
847                                }
848                            }
849                        }
850                        WriterRequest::Shutdown => {
851                            break;
852                        }
853                    }
854                }
855            });
856        });
857
858        Ok(Self {
859            mvcc,
860            head_watch_tx: head_tx,
861            writer_request_tx: request_tx,
862        })
863    }
864
865    async fn read(
866        &self,
867        query: Option<DCBQuery>,
868        start: Option<u64>,
869        backwards: bool,
870        limit: Option<u32>,
871    ) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
872        let reader = self.mvcc.reader()?;
873        let last_committed_position = reader.next_position.0.saturating_sub(1);
874
875        let q = query.unwrap_or(DCBQuery { items: vec![] });
876        let start_position = start.map(Position);
877
878        let events = read_conditional(
879            &self.mvcc,
880            &std::collections::HashMap::new(),
881            reader.events_tree_root_id,
882            reader.tags_tree_root_id,
883            q,
884            start_position,
885            backwards,
886            limit,
887            false,
888        )
889        .map_err(|e| DCBError::Corruption(format!("{e}")))?;
890
891        let head = if limit.is_none() {
892            if last_committed_position == 0 {
893                None
894            } else {
895                Some(last_committed_position)
896            }
897        } else {
898            events.last().map(|e| e.position)
899        };
900
901        Ok((events, head))
902    }
903
904    async fn head(&self) -> DCBResult<Option<u64>> {
905        let (_, header) = self
906            .mvcc
907            .get_latest_header()
908            .map_err(|e| DCBError::Corruption(format!("{e}")))?;
909        let last = header.next_position.0.saturating_sub(1);
910        if last == 0 { Ok(None) } else { Ok(Some(last)) }
911    }
912
913    async fn get_tracking_info(&self, source: String) -> DCBResult<Option<u64>> {
914        let db = UmaDB::from_arc(self.mvcc.clone());
915        db.get_tracking_info(&source)
916    }
917
918    pub async fn append(
919        &self,
920        events: Vec<DCBEvent>,
921        condition: Option<DCBAppendCondition>,
922        tracking_info: Option<TrackingInfo>,
923    ) -> DCBResult<u64> {
924        // Concurrent pre-check of the given condition using a reader in a blocking thread.
925        let pre_append_decision = if let Some(mut given_condition) = condition {
926            let reader = self.mvcc.reader()?;
927            let current_head = {
928                let last = reader.next_position.0.saturating_sub(1);
929                if last == 0 { None } else { Some(last) }
930            };
931
932            // Perform conditional read on the snapshot (limit 1) starting after the given position
933            let from = given_condition.after.map(|after| Position(after + 1));
934            let empty_dirty = std::collections::HashMap::new();
935            let found = read_conditional(
936                &self.mvcc,
937                &empty_dirty,
938                reader.events_tree_root_id,
939                reader.tags_tree_root_id,
940                given_condition.fail_if_events_match.clone(),
941                from,
942                false,
943                Some(1),
944                false,
945            )?;
946
947            if let Some(matched) = found.first() {
948                // Found one event — consider if the request is idempotent...
949                match is_request_idempotent(
950                    &self.mvcc,
951                    &empty_dirty,
952                    reader.events_tree_root_id,
953                    reader.tags_tree_root_id,
954                    &events,
955                    given_condition.fail_if_events_match.clone(),
956                    from,
957                ) {
958                    Ok(Some(last_recorded_position)) => {
959                        // Request is idempotent; skip actual append
960                        PreAppendDecision::AlreadyAppended(last_recorded_position)
961                    }
962                    Ok(None) => {
963                        // Integrity violation
964                        let msg = format!(
965                            "condition: {:?} matched: {:?}",
966                            given_condition.clone(),
967                            matched,
968                        );
969                        return Err(DCBError::IntegrityError(msg));
970                    }
971                    Err(err) => {
972                        // Propagate underlying read error
973                        return Err(err);
974                    }
975                }
976            } else {
977                // No match found: we can advance 'after' to the current head observed by this reader
978                let new_after = std::cmp::max(
979                    given_condition.after.unwrap_or(0),
980                    current_head.unwrap_or(0),
981                );
982                given_condition.after = Some(new_after);
983
984                PreAppendDecision::UseCondition(Some(given_condition))
985            }
986        } else {
987            // No condition provided at all
988            PreAppendDecision::UseCondition(None)
989        };
990
991        // Handle the pre-check decision
992        match pre_append_decision {
993            PreAppendDecision::AlreadyAppended(last_found_position) => {
994                // ✅ Request was idempotent — just return the existing position.
995                Ok(last_found_position)
996            }
997            PreAppendDecision::UseCondition(adjusted_condition) => {
998                // ✅ Proceed with append operation on the writer thread.
999                let (response_tx, response_rx) = oneshot::channel();
1000
1001                self.writer_request_tx
1002                    .send(WriterRequest::Append {
1003                        events,
1004                        condition: adjusted_condition,
1005                        tracking_info,
1006                        response_tx,
1007                    })
1008                    .await
1009                    .map_err(|_| {
1010                        DCBError::Io(std::io::Error::other(
1011                            "Failed to send append request to EventStore thread",
1012                        ))
1013                    })?;
1014
1015                response_rx.await.map_err(|_| {
1016                    DCBError::Io(std::io::Error::other(
1017                        "Failed to receive append response from EventStore thread",
1018                    ))
1019                })?
1020            }
1021        }
1022    }
1023
1024    fn watch_head(&self) -> watch::Receiver<Option<u64>> {
1025        self.head_watch_tx.subscribe()
1026    }
1027
1028    #[allow(dead_code)]
1029    async fn shutdown(&self) {
1030        let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
1031    }
1032}
1033
1034fn clone_dcb_error(src: &DCBError) -> DCBError {
1035    match src {
1036        DCBError::AuthenticationError(err) => DCBError::AuthenticationError(err.to_string()),
1037        DCBError::InitializationError(err) => DCBError::InitializationError(err.to_string()),
1038        DCBError::Io(err) => DCBError::Io(std::io::Error::other(err.to_string())),
1039        DCBError::IntegrityError(s) => DCBError::IntegrityError(s.clone()),
1040        DCBError::Corruption(s) => DCBError::Corruption(s.clone()),
1041        DCBError::InvalidArgument(s) => DCBError::InvalidArgument(s.clone()),
1042        DCBError::PageNotFound(id) => DCBError::PageNotFound(*id),
1043        DCBError::DirtyPageNotFound(id) => DCBError::DirtyPageNotFound(*id),
1044        DCBError::RootIDMismatch(old_id, new_id) => DCBError::RootIDMismatch(*old_id, *new_id),
1045        DCBError::DatabaseCorrupted(s) => DCBError::DatabaseCorrupted(s.clone()),
1046        DCBError::InternalError(s) => DCBError::InternalError(s.clone()),
1047        DCBError::SerializationError(s) => DCBError::SerializationError(s.clone()),
1048        DCBError::DeserializationError(s) => DCBError::DeserializationError(s.clone()),
1049        DCBError::PageAlreadyFreed(id) => DCBError::PageAlreadyFreed(*id),
1050        DCBError::PageAlreadyDirty(id) => DCBError::PageAlreadyDirty(*id),
1051        DCBError::TransportError(err) => DCBError::TransportError(err.clone()),
1052        DCBError::CancelledByUser() => DCBError::CancelledByUser(),
1053    }
1054}
1055
1056// Clone implementation for EventStoreHandle
1057impl Clone for RequestHandler {
1058    fn clone(&self) -> Self {
1059        Self {
1060            mvcc: self.mvcc.clone(),
1061            head_watch_tx: self.head_watch_tx.clone(),
1062            writer_request_tx: self.writer_request_tx.clone(),
1063        }
1064    }
1065}
1066
1067#[derive(Debug)]
1068enum PreAppendDecision {
1069    /// Proceed with this (possibly adjusted) condition
1070    UseCondition(Option<DCBAppendCondition>),
1071    /// Skip append operation because the request was idempotent; return last recorded position
1072    AlreadyAppended(u64),
1073}