umadb_server/
lib.rs

1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::LazyLock;
7use std::thread;
8use std::time::Instant;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::{Identity, ServerTlsConfig};
12use tonic::{Request, Response, Status, transport::Server};
13use umadb_core::db::{
14    DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDB, is_request_idempotent, read_conditional,
15};
16use umadb_core::mvcc::Mvcc;
17use umadb_dcb::{DCBAppendCondition, DCBError, DCBEvent, DCBQuery, DCBResult, DCBSequencedEvent};
18
19use tokio::runtime::Runtime;
20use tonic::codegen::http;
21use tonic::transport::server::TcpIncoming;
22use umadb_core::common::Position;
23
24use std::convert::Infallible;
25use std::future::Future;
26use std::task::{Context, Poll};
27use tonic::server::NamedService;
28use umadb_proto::status_from_dcb_error;
29
30// This is just to maintain compatibility for the very early unversioned API (pre-v1).
31#[derive(Clone, Debug)]
32pub struct PathRewriterService<S> {
33    inner: S,
34}
35
36impl<S> tower::Service<http::Request<tonic::body::Body>> for PathRewriterService<S>
37where
38    S: tower::Service<
39            http::Request<tonic::body::Body>,
40            Response = http::Response<tonic::body::Body>,
41            Error = Infallible,
42        > + Clone
43        + Send
44        + 'static,
45    S::Future: Send + 'static,
46{
47    type Response = S::Response;
48    type Error = S::Error;
49    type Future =
50        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
51
52    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53        self.inner.poll_ready(cx)
54    }
55
56    fn call(&mut self, mut req: http::Request<tonic::body::Body>) -> Self::Future {
57        let uri = req.uri().clone();
58        let path = uri.path();
59
60        // Check and rewrite the path string first
61        if path.starts_with("/umadb.UmaDBService/") {
62            let new_path_str = path.replace("/umadb.UmaDBService/", "/umadb.v1.DCB/");
63
64            // Use the existing authority and scheme if present, otherwise default to a simple path-only URI structure
65            // which is often safer than hardcoded hostnames in internal systems.
66            let new_uri = if let (Some(scheme), Some(authority)) = (uri.scheme(), uri.authority()) {
67                // If we have all components, try to build the full URI
68                http::Uri::builder()
69                    .scheme(scheme.clone())
70                    .authority(authority.clone())
71                    .path_and_query(new_path_str.as_str())
72                    .build()
73                    .ok() // Convert the final build Result into an Option
74            } else {
75                // Fallback for malformed requests (missing scheme/authority)
76                // Just try to build a path-only URI
77                new_path_str.parse::<http::Uri>().ok()
78            };
79
80            if let Some(final_uri) = new_uri {
81                *req.uri_mut() = final_uri;
82            } else {
83                eprintln!("Failed to construct valid URI for path: {}", path);
84            }
85        }
86
87        let fut = self.inner.call(req);
88        Box::pin(fut)
89    }
90}
91
92// Add this implementation to satisfy the compiler error
93impl<S: NamedService> NamedService for PathRewriterService<S> {
94    const NAME: &'static str = S::NAME;
95}
96
97#[derive(Clone, Debug)]
98pub struct PathRewriterLayer;
99
100impl<S> tower::Layer<S> for PathRewriterLayer
101where
102    S: tower::Service<
103            http::Request<tonic::body::Body>,
104            Response = http::Response<tonic::body::Body>,
105            Error = Infallible,
106        > + Clone
107        + Send
108        + 'static,
109    S::Future: Send + 'static,
110{
111    type Service = PathRewriterService<S>;
112
113    fn layer(&self, inner: S) -> Self::Service {
114        PathRewriterService { inner }
115    }
116}
117
118static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
119
120const APPEND_BATCH_MAX_EVENTS: usize = 2000;
121const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
122const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
123
124// Optional TLS configuration helpers
125#[derive(Clone, Debug)]
126pub struct ServerTlsOptions {
127    pub cert_pem: Vec<u8>,
128    pub key_pem: Vec<u8>,
129}
130
131pub fn uptime() -> std::time::Duration {
132    START_TIME.elapsed()
133}
134
135fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
136    use std::time::Duration;
137    let mut server_builder = Server::builder()
138        .http2_keepalive_interval(Some(Duration::from_secs(5)))
139        .http2_keepalive_timeout(Some(Duration::from_secs(10)))
140        .initial_stream_window_size(Some(4 * 1024 * 1024))
141        .initial_connection_window_size(Some(8 * 1024 * 1024))
142        .tcp_nodelay(true)
143        .concurrency_limit_per_connection(1024);
144
145    if let Some(opts) = tls {
146        let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
147        server_builder = server_builder
148            .tls_config(ServerTlsConfig::new().identity(identity))
149            .expect("failed to apply TLS config");
150    }
151
152    server_builder
153}
154
155// Function to start the gRPC server with a shutdown signal
156pub async fn start_server<P: AsRef<Path> + Send + 'static>(
157    path: P,
158    addr: &str,
159    shutdown_rx: oneshot::Receiver<()>,
160) -> Result<(), Box<dyn std::error::Error>> {
161    start_server_internal(path, addr, shutdown_rx, None, None).await
162}
163
164/// Start server with TLS using PEM-encoded cert and key.
165pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
166    path: P,
167    addr: &str,
168    shutdown_rx: oneshot::Receiver<()>,
169    cert_pem: Vec<u8>,
170    key_pem: Vec<u8>,
171) -> Result<(), Box<dyn std::error::Error>> {
172    let tls = ServerTlsOptions { cert_pem, key_pem };
173    start_server_internal(path, addr, shutdown_rx, Some(tls), None).await
174}
175
176/// Convenience: load cert and key from filesystem paths
177pub async fn start_server_secure_from_files<
178    P: AsRef<Path> + Send + 'static,
179    CP: AsRef<Path>,
180    KP: AsRef<Path>,
181>(
182    path: P,
183    addr: &str,
184    shutdown_rx: oneshot::Receiver<()>,
185    cert_path: CP,
186    key_path: KP,
187) -> Result<(), Box<dyn std::error::Error>> {
188    let cert_path_ref = cert_path.as_ref();
189    let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
190        format!(
191            "Failed to open TLS certificate file '{}': {}",
192            cert_path_ref.display(),
193            e
194        )
195        .into()
196    })?;
197
198    let key_path_ref = key_path.as_ref();
199    let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
200        format!(
201            "Failed to open TLS key file '{}': {}",
202            key_path_ref.display(),
203            e
204        )
205        .into()
206    })?;
207    start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
208}
209
210/// Start server (insecure) requiring an API key for all RPCs.
211pub async fn start_server_with_api_key<P: AsRef<Path> + Send + 'static>(
212    path: P,
213    addr: &str,
214    shutdown_rx: oneshot::Receiver<()>,
215    api_key: String,
216) -> Result<(), Box<dyn std::error::Error>> {
217    start_server_internal(path, addr, shutdown_rx, None, Some(api_key)).await
218}
219
220/// Start TLS server requiring an API key for all RPCs.
221pub async fn start_server_secure_with_api_key<P: AsRef<Path> + Send + 'static>(
222    path: P,
223    addr: &str,
224    shutdown_rx: oneshot::Receiver<()>,
225    cert_pem: Vec<u8>,
226    key_pem: Vec<u8>,
227    api_key: String,
228) -> Result<(), Box<dyn std::error::Error>> {
229    let tls = ServerTlsOptions { cert_pem, key_pem };
230    start_server_internal(path, addr, shutdown_rx, Some(tls), Some(api_key)).await
231}
232
233/// TLS server from files requiring an API key
234pub async fn start_server_secure_from_files_with_api_key<
235    P: AsRef<Path> + Send + 'static,
236    CP: AsRef<Path>,
237    KP: AsRef<Path>,
238>(
239    path: P,
240    addr: &str,
241    shutdown_rx: oneshot::Receiver<()>,
242    cert_path: CP,
243    key_path: KP,
244    api_key: String,
245) -> Result<(), Box<dyn std::error::Error>> {
246    let cert_path_ref = cert_path.as_ref();
247    let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
248        format!(
249            "Failed to open TLS certificate file '{}': {}",
250            cert_path_ref.display(),
251            e
252        )
253        .into()
254    })?;
255
256    let key_path_ref = key_path.as_ref();
257    let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
258        format!(
259            "Failed to open TLS key file '{}': {}",
260            key_path_ref.display(),
261            e
262        )
263        .into()
264    })?;
265    start_server_secure_with_api_key(path, addr, shutdown_rx, cert_pem, key_pem, api_key).await
266}
267
268async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
269    path: P,
270    addr: &str,
271    shutdown_rx: oneshot::Receiver<()>,
272    tls: Option<ServerTlsOptions>,
273    api_key: Option<String>,
274) -> Result<(), Box<dyn std::error::Error>> {
275    let addr = addr.parse()?;
276    // ---- Bind incoming manually like tonic ----
277    let incoming = match TcpIncoming::bind(addr) {
278        Ok(incoming) => incoming,
279        Err(err) => {
280            return Err(Box::new(DCBError::InitializationError(format!(
281                "Failed to bind to address {}: {}",
282                addr, err
283            ))));
284        }
285    }
286    .with_nodelay(Some(true))
287    .with_keepalive(Some(std::time::Duration::from_secs(60)));
288
289    // Create a shutdown broadcast channel for terminating ongoing subscriptions
290    let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
291    let dcb_server =
292        match DCBServer::new(path.as_ref().to_owned(), srv_shutdown_rx, api_key.clone()) {
293            Ok(server) => server,
294            Err(err) => {
295                return Err(Box::new(err));
296            }
297        };
298
299    println!(
300        "UmaDB has {:?} events",
301        dcb_server.request_handler.head().await?.unwrap_or(0)
302    );
303    let tls_mode_display_str = if tls.is_some() {
304        "with TLS"
305    } else {
306        "without TLS"
307    };
308
309    let api_key_display_str = if api_key.is_some() {
310        "with API key"
311    } else {
312        "without API key"
313    };
314
315    // gRPC Health service setup
316    use tonic_health::ServingStatus; // server API expects this enum
317    let (health_reporter, health_service) = tonic_health::server::health_reporter();
318    // Set overall and service-specific health to SERVING
319    health_reporter
320        .set_service_status("", ServingStatus::Serving)
321        .await;
322    health_reporter
323        .set_service_status("umadb.v1.DCB", ServingStatus::Serving)
324        .await;
325    let health_reporter_for_shutdown = health_reporter.clone();
326
327    // Apply PathRewriterLayer at the server level to intercept all requests before routing
328    let mut builder = build_server_builder_with_options(tls)
329        .layer(PathRewriterLayer)
330        .add_service(health_service);
331
332    // Add DCB service (auth enforced inside RPC handlers if configured)
333    builder = builder.add_service(dcb_server.into_service());
334    let router = builder;
335
336    println!("UmaDB is listening on {addr} ({tls_mode_display_str}, {api_key_display_str})");
337    println!("UmaDB started in {:?}", uptime());
338    // let incoming = router.server.bind_incoming();
339    router
340        .serve_with_incoming_shutdown(incoming, async move {
341            // Wait for an external shutdown trigger
342            let _ = shutdown_rx.await;
343            // Mark health as NOT_SERVING before shutdown
344            let _ = health_reporter_for_shutdown
345                .set_service_status("", ServingStatus::NotServing)
346                .await;
347            let _ = health_reporter_for_shutdown
348                .set_service_status("umadb.v1.DCB", ServingStatus::NotServing)
349                .await;
350            // Broadcast shutdown to all subscription tasks
351            let _ = srv_shutdown_tx.send(true);
352            println!("UmaDB server shutdown complete");
353        })
354        .await?;
355
356    Ok(())
357}
358
359// gRPC server implementation
360pub struct DCBServer {
361    pub(crate) request_handler: RequestHandler,
362    shutdown_watch_rx: watch::Receiver<bool>,
363    api_key: Option<String>,
364}
365
366impl DCBServer {
367    pub fn new<P: AsRef<Path> + Send + 'static>(
368        path: P,
369        shutdown_rx: watch::Receiver<bool>,
370        api_key: Option<String>,
371    ) -> DCBResult<Self> {
372        let command_handler = RequestHandler::new(path)?;
373        Ok(Self {
374            request_handler: command_handler,
375            shutdown_watch_rx: shutdown_rx,
376            api_key,
377        })
378    }
379
380    pub fn into_service(self) -> umadb_proto::v1::dcb_server::DcbServer<Self> {
381        umadb_proto::v1::dcb_server::DcbServer::new(self)
382    }
383}
384
385#[tonic::async_trait]
386impl umadb_proto::v1::dcb_server::Dcb for DCBServer {
387    type ReadStream =
388        Pin<Box<dyn Stream<Item = Result<umadb_proto::v1::ReadResponse, Status>> + Send + 'static>>;
389
390    async fn read(
391        &self,
392        request: Request<umadb_proto::v1::ReadRequest>,
393    ) -> Result<Response<Self::ReadStream>, Status> {
394        // Enforce API key if configured
395        if let Some(expected) = &self.api_key {
396            let auth = request.metadata().get("authorization");
397            let expected_val = format!("Bearer {}", expected);
398            let ok = auth
399                .and_then(|m| m.to_str().ok())
400                .map(|s| s == expected_val)
401                .unwrap_or(false);
402            if !ok {
403                return Err(status_from_dcb_error(DCBError::AuthenticationError(
404                    "missing or invalid API key".to_string(),
405                )));
406            }
407        }
408        let read_request = request.into_inner();
409
410        // Convert protobuf query to DCB types
411        let mut query: Option<DCBQuery> = read_request.query.map(|q| q.into());
412        let start = read_request.start;
413        let backwards = read_request.backwards.unwrap_or(false);
414        let limit = read_request.limit;
415        // Cap requested batch size.
416        let batch_size = read_request
417            .batch_size
418            .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
419            .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
420        let subscribe = read_request.subscribe.unwrap_or(false);
421
422        // Create a channel for streaming responses (deeper buffer to reduce backpressure under concurrency)
423        let (tx, rx) = mpsc::channel(2048);
424        // Clone the request handler.
425        let request_handler = self.request_handler.clone();
426        // Clone the shutdown watch receiver.
427        let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
428
429        // Spawn a task to handle the read operation and stream multiple batches
430        tokio::spawn(async move {
431            // Ensure we can reuse the same query across batches
432            let query_clone = query.take();
433            let mut next_start = start;
434            let mut sent_any = false;
435            let mut remaining_limit = limit.unwrap_or(u32::MAX);
436            // Create a watch receiver for head updates (for subscriptions)
437            // TODO: Make this an Option and only do this for subscriptions?
438            let mut head_rx = request_handler.watch_head();
439            // If non-subscription read, capture head to preserve point-in-time semantics
440            let captured_head = if !subscribe {
441                request_handler.head().await.unwrap_or(None)
442            } else {
443                None
444            };
445            loop {
446                // If this is a subscription, exit if the client
447                // has gone away or the server is shutting down.
448                if subscribe {
449                    if tx.is_closed() {
450                        break;
451                    }
452                    if *shutdown_watch_rx.borrow() {
453                        break;
454                    }
455                }
456                // Determine per-iteration limit.
457                let read_limit = remaining_limit.min(batch_size);
458                // If subscription and remaining exhausted (limit reached), terminate
459                if subscribe && limit.is_some() && remaining_limit == 0 {
460                    break;
461                }
462                match request_handler
463                    .read(query_clone.clone(), next_start, backwards, Some(read_limit))
464                    .await
465                {
466                    Ok((dcb_sequenced_events, head)) => {
467                        // Capture the original length before consuming events
468                        let original_len = dcb_sequenced_events.len();
469
470                        // Filter and map events, discarding those with position > captured_head
471                        let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
472                            dcb_sequenced_events
473                                .into_iter()
474                                .filter(|e| {
475                                    if let Some(h) = captured_head {
476                                        e.position <= h
477                                    } else {
478                                        true
479                                    }
480                                })
481                                .map(umadb_proto::v1::SequencedEvent::from)
482                                .collect();
483
484                        let reached_captured_head = if captured_head.is_some() {
485                            // Check if we filtered out any events
486                            sequenced_event_protos.len() < original_len
487                        } else {
488                            false
489                        };
490
491                        // Calculate head to send based on context
492                        // For subscriptions: use current head
493                        // For unlimited non-subscription reads: use captured_head
494                        // For limited reads: use last event position (or current head if empty)
495                        let last_event_position = sequenced_event_protos.last().map(|e| e.position);
496                        let head_to_send = if subscribe {
497                            head
498                        } else if limit.is_none() {
499                            captured_head
500                        } else {
501                            last_event_position.or(head)
502                        };
503
504                        if sequenced_event_protos.is_empty() {
505                            // Only send an empty response to communicate head if this is the first
506                            if !sent_any {
507                                let response = umadb_proto::v1::ReadResponse {
508                                    events: vec![],
509                                    head: head_to_send,
510                                };
511                                let _ = tx.send(Ok(response)).await;
512                            }
513                            // For subscriptions, wait for new events instead of terminating
514                            if subscribe {
515                                // Wait while head <= next_after (or None)
516                                loop {
517                                    // Stop if the channel is closed.
518                                    if tx.is_closed() {
519                                        break;
520                                    }
521                                    let current_head = *head_rx.borrow();
522                                    if current_head
523                                        .map(|h| h >= next_start.unwrap_or(1))
524                                        .unwrap_or(false)
525                                    {
526                                        break; // Break out of waiting, new events are available.
527                                    }
528                                    // Wait for either a new head or a server shutdown signal
529                                    tokio::select! {
530                                        res = head_rx.changed() => {
531                                            if res.is_err() { break; }
532                                        }
533                                        res2 = shutdown_watch_rx.changed() => {
534                                            if res2.is_ok() {
535                                                // Exit if shutting down.
536                                                if *shutdown_watch_rx.borrow() { break; }
537                                            } else {
538                                                break; // sender dropped
539                                            }
540                                        }
541                                    }
542                                }
543                                continue;
544                            }
545                            break;
546                        }
547
548                        // Capture values needed after sequenced_event_protos is moved
549                        let sent_count = sequenced_event_protos.len() as u32;
550
551                        let response = umadb_proto::v1::ReadResponse {
552                            events: sequenced_event_protos,
553                            head: head_to_send,
554                        };
555
556                        if tx.send(Ok(response)).await.is_err() {
557                            break;
558                        }
559                        sent_any = true;
560
561                        // Advance the cursor (use a new reader on the next loop iteration)
562                        next_start =
563                            last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
564
565                        // Stop streaming further if we reached the
566                        // captured head boundary (non-subscriber only).
567                        if reached_captured_head && !subscribe {
568                            break;
569                        }
570
571                        // Decrease the remaining overall limit if any, and stop if reached
572                        if limit.is_some() {
573                            if remaining_limit <= sent_count {
574                                remaining_limit = 0;
575                            } else {
576                                remaining_limit -= sent_count;
577                            }
578                            if remaining_limit == 0 {
579                                break;
580                            }
581                        }
582
583                        // Yield to let other tasks progress under high concurrency
584                        tokio::task::yield_now().await;
585                    }
586                    Err(e) => {
587                        let _ = tx.send(Err(status_from_dcb_error(e))).await;
588                        break;
589                    }
590                }
591            }
592        });
593
594        // Return the receiver as a stream
595        Ok(Response::new(
596            Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
597        ))
598    }
599
600    async fn append(
601        &self,
602        request: Request<umadb_proto::v1::AppendRequest>,
603    ) -> Result<Response<umadb_proto::v1::AppendResponse>, Status> {
604        // Enforce API key if configured
605        if let Some(expected) = &self.api_key {
606            let auth = request.metadata().get("authorization");
607            let expected_val = format!("Bearer {}", expected);
608            let ok = auth
609                .and_then(|m| m.to_str().ok())
610                .map(|s| s == expected_val)
611                .unwrap_or(false);
612            if !ok {
613                return Err(status_from_dcb_error(DCBError::AuthenticationError(
614                    "missing or invalid API key".to_string(),
615                )));
616            }
617        }
618        let req = request.into_inner();
619
620        // Convert protobuf types to API types
621        let events: Vec<DCBEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
622            Ok(events) => events,
623            Err(e) => {
624                return Err(status_from_dcb_error(e));
625            }
626        };
627        let condition = req.condition.map(|c| c.into());
628
629        // Call the event store append method
630        match self.request_handler.append(events, condition).await {
631            Ok(position) => Ok(Response::new(umadb_proto::v1::AppendResponse { position })),
632            Err(e) => Err(status_from_dcb_error(e)),
633        }
634    }
635
636    async fn head(
637        &self,
638        request: Request<umadb_proto::v1::HeadRequest>,
639    ) -> Result<Response<umadb_proto::v1::HeadResponse>, Status> {
640        // Enforce API key if configured
641        if let Some(expected) = &self.api_key {
642            let auth = request.metadata().get("authorization");
643            let expected_val = format!("Bearer {}", expected);
644            let ok = auth
645                .and_then(|m| m.to_str().ok())
646                .map(|s| s == expected_val)
647                .unwrap_or(false);
648            if !ok {
649                return Err(status_from_dcb_error(DCBError::AuthenticationError(
650                    "missing or invalid API key".to_string(),
651                )));
652            }
653        }
654        // Call the event store head method
655        match self.request_handler.head().await {
656            Ok(position) => {
657                // Return the position as a response
658                Ok(Response::new(umadb_proto::v1::HeadResponse { position }))
659            }
660            Err(e) => Err(status_from_dcb_error(e)),
661        }
662    }
663}
664
665// Message types for communication between the gRPC server and the request handler's writer thread
666enum WriterRequest {
667    Append {
668        events: Vec<DCBEvent>,
669        condition: Option<DCBAppendCondition>,
670        response_tx: oneshot::Sender<DCBResult<u64>>,
671    },
672    Shutdown,
673}
674
675// Thread-safe request handler
676struct RequestHandler {
677    mvcc: Arc<Mvcc>,
678    head_watch_tx: watch::Sender<Option<u64>>,
679    writer_request_tx: mpsc::Sender<WriterRequest>,
680}
681
682impl RequestHandler {
683    fn new<P: AsRef<Path> + Send + 'static>(path: P) -> DCBResult<Self> {
684        // Create a channel for sending requests to the writer thread
685        let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
686
687        // Build a shared Mvcc instance (Arc) upfront so reads can proceed concurrently
688        let p = path.as_ref();
689        let file_path = if p.is_dir() {
690            p.join(DEFAULT_DB_FILENAME)
691        } else {
692            p.to_path_buf()
693        };
694        let mvcc = Arc::new(Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)?);
695
696        // Initialize the head watch channel with the current head.
697        let init_head = {
698            let (_, header) = mvcc.get_latest_header()?;
699            let last = header.next_position.0.saturating_sub(1);
700            if last == 0 { None } else { Some(last) }
701        };
702        let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
703
704        // Spawn a thread for processing writer requests.
705        let mvcc_for_writer = mvcc.clone();
706        let head_tx_writer = head_tx.clone();
707        thread::spawn(move || {
708            let db = UmaDB::from_arc(mvcc_for_writer);
709
710            // Create a runtime for processing writer requests.
711            let rt = Runtime::new().unwrap();
712
713            // Process writer requests.
714            rt.block_on(async {
715                while let Some(request) = request_rx.recv().await {
716                    match request {
717                        WriterRequest::Append {
718                            events,
719                            condition,
720                            response_tx,
721                        } => {
722                            // Batch processing: drain any immediately available requests
723                            // let mut items: Vec<(Vec<DCBEvent>, Option<DCBAppendCondition>)> =
724                            //     Vec::new();
725                            let mut responders: Vec<oneshot::Sender<DCBResult<u64>>> = Vec::new();
726
727                            let mut total_events = 0;
728                            total_events += events.len();
729                            // items.push((events, condition));
730
731                            let mvcc = &db.mvcc;
732                            let mut writer = match mvcc.writer() {
733                                Ok(writer) => writer,
734                                Err(err) => {
735                                    let _ = response_tx.send(Err(err));
736                                    continue;
737                                }
738                            };
739                            responders.push(response_tx);
740
741                            let mut results: Vec<DCBResult<u64>> = Vec::new();
742
743                            UmaDB::process_append_request(
744                                events,
745                                condition,
746                                false,
747                                mvcc,
748                                &mut writer,
749                                &mut results,
750                            );
751
752                            // Drain the channel for more pending writer requests without awaiting.
753                            // Important: do not drop a popped request when hitting the batch limit.
754                            // We stop draining BEFORE attempting to recv if we've reached the limit.
755                            loop {
756                                if total_events >= APPEND_BATCH_MAX_EVENTS {
757                                    break;
758                                }
759                                match request_rx.try_recv() {
760                                    Ok(WriterRequest::Append {
761                                        events,
762                                        condition,
763                                        response_tx,
764                                    }) => {
765                                        let ev_len = events.len();
766                                        UmaDB::process_append_request(
767                                            events,
768                                            condition,
769                                            false,
770                                            mvcc,
771                                            &mut writer,
772                                            &mut results,
773                                        );
774                                        responders.push(response_tx);
775                                        total_events += ev_len;
776                                    }
777                                    Ok(WriterRequest::Shutdown) => {
778                                        // Push back the shutdown signal by breaking and letting
779                                        // outer loop handle after batch. We'll process the
780                                        // current batch first, then break the outer loop on
781                                        // the next iteration when the channel is empty.
782                                        break;
783                                    }
784                                    Err(mpsc::error::TryRecvError::Empty) => break,
785                                    Err(mpsc::error::TryRecvError::Disconnected) => break,
786                                }
787                            }
788                            // println!("Total events: {total_events}");
789                            // Execute a single batched append operation.
790
791                            // Single commit at the end of the batch
792                            let batch_result = match mvcc.commit(&mut writer) {
793                                Ok(_) => Ok(results),
794                                Err(err) => Err(err),
795                            };
796
797                            // let batch_result = db.append_batch(items, false);
798                            match batch_result {
799                                Ok(results) => {
800                                    // Send individual results back to requesters
801                                    // Also compute the new head as the maximum successful last position in this batch
802                                    let mut max_ok: Option<u64> = None;
803                                    for (res, tx) in results.into_iter().zip(responders.into_iter())
804                                    {
805                                        if let Ok(v) = &res {
806                                            max_ok = Some(max_ok.map_or(*v, |m| m.max(*v)));
807                                        }
808                                        let _ = tx.send(res);
809                                    }
810                                    // After a successful batch commit, publish the updated head.
811                                    if let Some(h) = max_ok {
812                                        let _ = head_tx_writer.send(Some(h));
813                                    }
814                                }
815                                Err(e) => {
816                                    // If the batch failed as a whole (e.g., commit failed), propagate the SAME error to all responders.
817                                    // DCBError is not Clone (contains io::Error), so reconstruct a best-effort copy by using its Display text
818                                    // for Io and cloning data for other variants.
819                                    let total = responders.len();
820                                    let mut iter = responders.into_iter();
821                                    for _ in 0..total {
822                                        if let Some(tx) = iter.next() {
823                                            let _ = tx.send(Err(clone_dcb_error(&e)));
824                                        }
825                                    }
826                                }
827                            }
828                        }
829                        WriterRequest::Shutdown => {
830                            break;
831                        }
832                    }
833                }
834            });
835        });
836
837        Ok(Self {
838            mvcc,
839            head_watch_tx: head_tx,
840            writer_request_tx: request_tx,
841        })
842    }
843
844    async fn read(
845        &self,
846        query: Option<DCBQuery>,
847        start: Option<u64>,
848        backwards: bool,
849        limit: Option<u32>,
850    ) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
851        let reader = self.mvcc.reader()?;
852        let last_committed_position = reader.next_position.0.saturating_sub(1);
853
854        let q = query.unwrap_or(DCBQuery { items: vec![] });
855        let start_position = start.map(Position);
856
857        let events = read_conditional(
858            &self.mvcc,
859            &std::collections::HashMap::new(),
860            reader.events_tree_root_id,
861            reader.tags_tree_root_id,
862            q,
863            start_position,
864            backwards,
865            limit,
866            false,
867        )
868        .map_err(|e| DCBError::Corruption(format!("{e}")))?;
869
870        let head = if limit.is_none() {
871            if last_committed_position == 0 {
872                None
873            } else {
874                Some(last_committed_position)
875            }
876        } else {
877            events.last().map(|e| e.position)
878        };
879
880        Ok((events, head))
881    }
882
883    async fn head(&self) -> DCBResult<Option<u64>> {
884        let (_, header) = self
885            .mvcc
886            .get_latest_header()
887            .map_err(|e| DCBError::Corruption(format!("{e}")))?;
888        let last = header.next_position.0.saturating_sub(1);
889        if last == 0 { Ok(None) } else { Ok(Some(last)) }
890    }
891    pub async fn append(
892        &self,
893        events: Vec<DCBEvent>,
894        condition: Option<DCBAppendCondition>,
895    ) -> DCBResult<u64> {
896        // Concurrent pre-check of the given condition using a reader in a blocking thread.
897        let pre_append_decision = if let Some(mut given_condition) = condition {
898            let reader = self.mvcc.reader()?;
899            let current_head = {
900                let last = reader.next_position.0.saturating_sub(1);
901                if last == 0 { None } else { Some(last) }
902            };
903
904            // Perform conditional read on the snapshot (limit 1) starting after the given position
905            let from = given_condition.after.map(|after| Position(after + 1));
906            let empty_dirty = std::collections::HashMap::new();
907            let found = read_conditional(
908                &self.mvcc,
909                &empty_dirty,
910                reader.events_tree_root_id,
911                reader.tags_tree_root_id,
912                given_condition.fail_if_events_match.clone(),
913                from,
914                false,
915                Some(1),
916                false,
917            )?;
918
919            if let Some(matched) = found.first() {
920                // Found one event — consider if the request is idempotent...
921                match is_request_idempotent(
922                    &self.mvcc,
923                    &empty_dirty,
924                    reader.events_tree_root_id,
925                    reader.tags_tree_root_id,
926                    &events,
927                    given_condition.fail_if_events_match.clone(),
928                    from,
929                ) {
930                    Ok(Some(last_recorded_position)) => {
931                        // Request is idempotent; skip actual append
932                        PreAppendDecision::AlreadyAppended(last_recorded_position)
933                    }
934                    Ok(None) => {
935                        // Integrity violation
936                        let msg = format!(
937                            "condition: {:?} matched: {:?}",
938                            given_condition.clone(),
939                            matched,
940                        );
941                        return Err(DCBError::IntegrityError(msg));
942                    }
943                    Err(err) => {
944                        // Propagate underlying read error
945                        return Err(err);
946                    }
947                }
948            } else {
949                // No match found: we can advance 'after' to the current head observed by this reader
950                let new_after = std::cmp::max(
951                    given_condition.after.unwrap_or(0),
952                    current_head.unwrap_or(0),
953                );
954                given_condition.after = Some(new_after);
955
956                PreAppendDecision::UseCondition(Some(given_condition))
957            }
958        } else {
959            // No condition provided at all
960            PreAppendDecision::UseCondition(None)
961        };
962
963        // Handle the pre-check decision
964        match pre_append_decision {
965            PreAppendDecision::AlreadyAppended(last_found_position) => {
966                // ✅ Request was idempotent — just return the existing position.
967                Ok(last_found_position)
968            }
969            PreAppendDecision::UseCondition(adjusted_condition) => {
970                // ✅ Proceed with append operation on the writer thread.
971                let (response_tx, response_rx) = oneshot::channel();
972
973                self.writer_request_tx
974                    .send(WriterRequest::Append {
975                        events,
976                        condition: adjusted_condition,
977                        response_tx,
978                    })
979                    .await
980                    .map_err(|_| {
981                        DCBError::Io(std::io::Error::other(
982                            "Failed to send append request to EventStore thread",
983                        ))
984                    })?;
985
986                response_rx.await.map_err(|_| {
987                    DCBError::Io(std::io::Error::other(
988                        "Failed to receive append response from EventStore thread",
989                    ))
990                })?
991            }
992        }
993    }
994
995    fn watch_head(&self) -> watch::Receiver<Option<u64>> {
996        self.head_watch_tx.subscribe()
997    }
998
999    #[allow(dead_code)]
1000    async fn shutdown(&self) {
1001        let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
1002    }
1003}
1004
1005fn clone_dcb_error(src: &DCBError) -> DCBError {
1006    match src {
1007        DCBError::AuthenticationError(err) => DCBError::AuthenticationError(err.to_string()),
1008        DCBError::InitializationError(err) => DCBError::InitializationError(err.to_string()),
1009        DCBError::Io(err) => DCBError::Io(std::io::Error::other(err.to_string())),
1010        DCBError::IntegrityError(s) => DCBError::IntegrityError(s.clone()),
1011        DCBError::Corruption(s) => DCBError::Corruption(s.clone()),
1012        DCBError::PageNotFound(id) => DCBError::PageNotFound(*id),
1013        DCBError::DirtyPageNotFound(id) => DCBError::DirtyPageNotFound(*id),
1014        DCBError::RootIDMismatch(old_id, new_id) => DCBError::RootIDMismatch(*old_id, *new_id),
1015        DCBError::DatabaseCorrupted(s) => DCBError::DatabaseCorrupted(s.clone()),
1016        DCBError::InternalError(s) => DCBError::InternalError(s.clone()),
1017        DCBError::SerializationError(s) => DCBError::SerializationError(s.clone()),
1018        DCBError::DeserializationError(s) => DCBError::DeserializationError(s.clone()),
1019        DCBError::PageAlreadyFreed(id) => DCBError::PageAlreadyFreed(*id),
1020        DCBError::PageAlreadyDirty(id) => DCBError::PageAlreadyDirty(*id),
1021        DCBError::TransportError(err) => DCBError::TransportError(err.clone()),
1022        DCBError::CancelledByUser() => DCBError::CancelledByUser(),
1023    }
1024}
1025
1026// Clone implementation for EventStoreHandle
1027impl Clone for RequestHandler {
1028    fn clone(&self) -> Self {
1029        Self {
1030            mvcc: self.mvcc.clone(),
1031            head_watch_tx: self.head_watch_tx.clone(),
1032            writer_request_tx: self.writer_request_tx.clone(),
1033        }
1034    }
1035}
1036
1037#[derive(Debug)]
1038enum PreAppendDecision {
1039    /// Proceed with this (possibly adjusted) condition
1040    UseCondition(Option<DCBAppendCondition>),
1041    /// Skip append operation because the request was idempotent; return last recorded position
1042    AlreadyAppended(u64),
1043}