Skip to main content

umadb_server/
lib.rs

1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::LazyLock;
7use std::thread;
8use std::time::Instant;
9use tokio::sync::{mpsc, oneshot, watch};
10use tokio_stream::wrappers::ReceiverStream;
11use tonic::transport::{Identity, ServerTlsConfig};
12use tonic::{Request, Response, Status, transport::Server};
13use umadb_core::db::{
14    DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDb, clone_dcb_error, is_integrity_error,
15    is_request_idempotent, read_conditional, shadow_for_batch_abort,
16};
17use umadb_core::mvcc::Mvcc;
18use umadb_dcb::{
19    DcbAppendCondition, DcbEvent, DcbQuery, DcbResult, DcbSequencedEvent, DcbError, TrackingInfo,
20};
21
22use tokio::runtime::Runtime;
23use tonic::codegen::http;
24use tonic::transport::server::TcpIncoming;
25use umadb_core::common::Position;
26
27use std::convert::Infallible;
28use std::future::Future;
29use std::task::{Context, Poll};
30use tonic::server::NamedService;
31use umadb_proto::status_from_dcb_error;
32
33// This is just to maintain compatibility for the very early unversioned API (pre-v1).
34#[derive(Clone, Debug)]
35pub struct PathRewriterService<S> {
36    inner: S,
37}
38
39impl<S> tower::Service<http::Request<tonic::body::Body>> for PathRewriterService<S>
40where
41    S: tower::Service<
42            http::Request<tonic::body::Body>,
43            Response = http::Response<tonic::body::Body>,
44            Error = Infallible,
45        > + Clone
46        + Send
47        + 'static,
48    S::Future: Send + 'static,
49{
50    type Response = S::Response;
51    type Error = S::Error;
52    type Future =
53        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
54
55    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
56        self.inner.poll_ready(cx)
57    }
58
59    fn call(&mut self, mut req: http::Request<tonic::body::Body>) -> Self::Future {
60        let uri = req.uri().clone();
61        let path = uri.path();
62
63        // Check and rewrite the path string first
64        if path.starts_with("/umadb.UmaDBService/") {
65            let new_path_str = path.replace("/umadb.UmaDBService/", "/umadb.v1.DCB/");
66
67            // Use the existing authority and scheme if present, otherwise default to a simple path-only URI structure
68            // which is often safer than hardcoded hostnames in internal systems.
69            let new_uri = if let (Some(scheme), Some(authority)) = (uri.scheme(), uri.authority()) {
70                // If we have all components, try to build the full URI
71                http::Uri::builder()
72                    .scheme(scheme.clone())
73                    .authority(authority.clone())
74                    .path_and_query(new_path_str.as_str())
75                    .build()
76                    .ok() // Convert the final build Result into an Option
77            } else {
78                // Fallback for malformed requests (missing scheme/authority)
79                // Just try to build a path-only URI
80                new_path_str.parse::<http::Uri>().ok()
81            };
82
83            if let Some(final_uri) = new_uri {
84                *req.uri_mut() = final_uri;
85            } else {
86                eprintln!("failed to construct valid URI for path: {}", path);
87            }
88        }
89
90        let fut = self.inner.call(req);
91        Box::pin(fut)
92    }
93}
94
95// Add this implementation to satisfy the compiler error
96impl<S: NamedService> NamedService for PathRewriterService<S> {
97    const NAME: &'static str = S::NAME;
98}
99
100#[derive(Clone, Debug)]
101pub struct PathRewriterLayer;
102
103impl<S> tower::Layer<S> for PathRewriterLayer
104where
105    S: tower::Service<
106            http::Request<tonic::body::Body>,
107            Response = http::Response<tonic::body::Body>,
108            Error = Infallible,
109        > + Clone
110        + Send
111        + 'static,
112    S::Future: Send + 'static,
113{
114    type Service = PathRewriterService<S>;
115
116    fn layer(&self, inner: S) -> Self::Service {
117        PathRewriterService { inner }
118    }
119}
120
121static START_TIME: LazyLock<Instant> = LazyLock::new(Instant::now);
122
123const APPEND_BATCH_MAX_EVENTS: usize = 2000;
124const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
125const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
126
127// Optional TLS configuration helpers
128#[derive(Clone, Debug)]
129pub struct ServerTlsOptions {
130    pub cert_pem: Vec<u8>,
131    pub key_pem: Vec<u8>,
132}
133
134pub fn uptime() -> std::time::Duration {
135    START_TIME.elapsed()
136}
137
138fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
139    use std::time::Duration;
140    let mut server_builder = Server::builder()
141        .http2_keepalive_interval(Some(Duration::from_secs(5)))
142        .http2_keepalive_timeout(Some(Duration::from_secs(10)))
143        .initial_stream_window_size(Some(4 * 1024 * 1024))
144        .initial_connection_window_size(Some(8 * 1024 * 1024))
145        .tcp_nodelay(true)
146        .concurrency_limit_per_connection(1024);
147
148    if let Some(opts) = tls {
149        let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
150        server_builder = server_builder
151            .tls_config(ServerTlsConfig::new().identity(identity))
152            .expect("failed to apply TLS config");
153    }
154
155    server_builder
156}
157
158// Function to start the gRPC server with a shutdown signal
159pub async fn start_server<P: AsRef<Path> + Send + 'static>(
160    path: P,
161    addr: &str,
162    shutdown_rx: oneshot::Receiver<()>,
163) -> Result<(), Box<dyn std::error::Error>> {
164    start_server_internal(path, addr, shutdown_rx, None, None).await
165}
166
167/// Start server with TLS using PEM-encoded cert and key.
168pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
169    path: P,
170    addr: &str,
171    shutdown_rx: oneshot::Receiver<()>,
172    cert_pem: Vec<u8>,
173    key_pem: Vec<u8>,
174) -> Result<(), Box<dyn std::error::Error>> {
175    let tls = ServerTlsOptions { cert_pem, key_pem };
176    start_server_internal(path, addr, shutdown_rx, Some(tls), None).await
177}
178
179/// Convenience: load cert and key from filesystem paths
180pub async fn start_server_secure_from_files<
181    P: AsRef<Path> + Send + 'static,
182    CP: AsRef<Path>,
183    KP: AsRef<Path>,
184>(
185    path: P,
186    addr: &str,
187    shutdown_rx: oneshot::Receiver<()>,
188    cert_path: CP,
189    key_path: KP,
190) -> Result<(), Box<dyn std::error::Error>> {
191    let cert_path_ref = cert_path.as_ref();
192    let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
193        format!(
194            "failed to open TLS certificate file '{}': {}",
195            cert_path_ref.display(),
196            e
197        )
198        .into()
199    })?;
200
201    let key_path_ref = key_path.as_ref();
202    let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
203        format!(
204            "failed to open TLS key file '{}': {}",
205            key_path_ref.display(),
206            e
207        )
208        .into()
209    })?;
210    start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
211}
212
213/// Start server (insecure) requiring an API key for all RPCs.
214pub async fn start_server_with_api_key<P: AsRef<Path> + Send + 'static>(
215    path: P,
216    addr: &str,
217    shutdown_rx: oneshot::Receiver<()>,
218    api_key: String,
219) -> Result<(), Box<dyn std::error::Error>> {
220    start_server_internal(path, addr, shutdown_rx, None, Some(api_key)).await
221}
222
223/// Start TLS server requiring an API key for all RPCs.
224pub async fn start_server_secure_with_api_key<P: AsRef<Path> + Send + 'static>(
225    path: P,
226    addr: &str,
227    shutdown_rx: oneshot::Receiver<()>,
228    cert_pem: Vec<u8>,
229    key_pem: Vec<u8>,
230    api_key: String,
231) -> Result<(), Box<dyn std::error::Error>> {
232    let tls = ServerTlsOptions { cert_pem, key_pem };
233    start_server_internal(path, addr, shutdown_rx, Some(tls), Some(api_key)).await
234}
235
236/// TLS server from files requiring an API key
237pub async fn start_server_secure_from_files_with_api_key<
238    P: AsRef<Path> + Send + 'static,
239    CP: AsRef<Path>,
240    KP: AsRef<Path>,
241>(
242    path: P,
243    addr: &str,
244    shutdown_rx: oneshot::Receiver<()>,
245    cert_path: CP,
246    key_path: KP,
247    api_key: String,
248) -> Result<(), Box<dyn std::error::Error>> {
249    let cert_path_ref = cert_path.as_ref();
250    let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
251        format!(
252            "failed to open TLS certificate file '{}': {}",
253            cert_path_ref.display(),
254            e
255        )
256        .into()
257    })?;
258
259    let key_path_ref = key_path.as_ref();
260    let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
261        format!(
262            "failed to open TLS key file '{}': {}",
263            key_path_ref.display(),
264            e
265        )
266        .into()
267    })?;
268    start_server_secure_with_api_key(path, addr, shutdown_rx, cert_pem, key_pem, api_key).await
269}
270
271async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
272    path: P,
273    addr: &str,
274    shutdown_rx: oneshot::Receiver<()>,
275    tls: Option<ServerTlsOptions>,
276    api_key: Option<String>,
277) -> Result<(), Box<dyn std::error::Error>> {
278    let addr = addr.parse()?;
279    // ---- Bind incoming manually like tonic ----
280    let incoming = match TcpIncoming::bind(addr) {
281        Ok(incoming) => incoming,
282        Err(err) => {
283            return Err(Box::new(DcbError::InitializationError(format!(
284                "failed to bind to address {}: {}",
285                addr, err
286            ))));
287        }
288    }
289    .with_nodelay(Some(true))
290    .with_keepalive(Some(std::time::Duration::from_secs(60)));
291
292    // Create a shutdown broadcast channel for terminating ongoing subscriptions
293    let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
294    let dcb_server =
295        match DcbServer::new(path.as_ref().to_owned(), srv_shutdown_rx, api_key.clone()) {
296            Ok(server) => server,
297            Err(err) => {
298                return Err(Box::new(err));
299            }
300        };
301
302    println!(
303        "UmaDB has {:?} events",
304        dcb_server.request_handler.head().await?.unwrap_or(0)
305    );
306    let tls_mode_display_str = if tls.is_some() {
307        "with TLS"
308    } else {
309        "without TLS"
310    };
311
312    let api_key_display_str = if api_key.is_some() {
313        "with API key"
314    } else {
315        "without API key"
316    };
317
318    // gRPC Health service setup
319    use tonic_health::ServingStatus; // server API expects this enum
320    let (health_reporter, health_service) = tonic_health::server::health_reporter();
321    // Set overall and service-specific health to SERVING
322    health_reporter
323        .set_service_status("", ServingStatus::Serving)
324        .await;
325    health_reporter
326        .set_service_status("umadb.v1.DCB", ServingStatus::Serving)
327        .await;
328    let health_reporter_for_shutdown = health_reporter.clone();
329
330    // Apply PathRewriterLayer at the server level to intercept all requests before routing
331    let mut builder = build_server_builder_with_options(tls)
332        .layer(PathRewriterLayer)
333        .add_service(health_service);
334
335    // Add DCB service (auth enforced inside RPC handlers if configured)
336    builder = builder.add_service(dcb_server.into_service());
337    let router = builder;
338
339    println!("UmaDB is listening on {addr} ({tls_mode_display_str}, {api_key_display_str})");
340    println!("UmaDB started in {:?}", uptime());
341    // let incoming = router.server.bind_incoming();
342    router
343        .serve_with_incoming_shutdown(incoming, async move {
344            // Wait for an external shutdown trigger
345            let _ = shutdown_rx.await;
346            // Mark health as NOT_SERVING before shutdown
347            let _ = health_reporter_for_shutdown
348                .set_service_status("", ServingStatus::NotServing)
349                .await;
350            let _ = health_reporter_for_shutdown
351                .set_service_status("umadb.v1.DCB", ServingStatus::NotServing)
352                .await;
353            // Broadcast shutdown to all subscription tasks
354            let _ = srv_shutdown_tx.send(true);
355            println!("UmaDB server shutdown complete");
356        })
357        .await?;
358
359    Ok(())
360}
361
362// gRPC server implementation
363pub struct DcbServer {
364    pub(crate) request_handler: RequestHandler,
365    shutdown_watch_rx: watch::Receiver<bool>,
366    api_key: Option<String>,
367}
368
369impl DcbServer {
370    pub fn new<P: AsRef<Path> + Send + 'static>(
371        path: P,
372        shutdown_rx: watch::Receiver<bool>,
373        api_key: Option<String>,
374    ) -> DcbResult<Self> {
375        let command_handler = RequestHandler::new(path)?;
376        Ok(Self {
377            request_handler: command_handler,
378            shutdown_watch_rx: shutdown_rx,
379            api_key,
380        })
381    }
382
383    pub fn into_service(self) -> umadb_proto::v1::dcb_server::DcbServer<Self> {
384        umadb_proto::v1::dcb_server::DcbServer::new(self)
385    }
386
387    fn enforce_api_key(&self, metadata: &tonic::metadata::MetadataMap) -> Result<(), Status> {
388        if let Some(expected) = &self.api_key {
389            let auth = metadata.get("authorization");
390            let expected_val = format!("Bearer {}", expected);
391            let ok = auth
392                .and_then(|m| m.to_str().ok())
393                .map(|s| s == expected_val)
394                .unwrap_or(false);
395            if !ok {
396                return Err(status_from_dcb_error(DcbError::AuthenticationError(
397                    "missing or invalid API key".to_string(),
398                )));
399            }
400        }
401        Ok(())
402    }
403}
404
405#[tonic::async_trait]
406impl umadb_proto::v1::dcb_server::Dcb for DcbServer {
407    type ReadStream =
408        Pin<Box<dyn Stream<Item = Result<umadb_proto::v1::ReadResponse, Status>> + Send + 'static>>;
409    type SubscribeStream = Pin<
410        Box<dyn Stream<Item = Result<umadb_proto::v1::SubscribeResponse, Status>> + Send + 'static>,
411    >;
412
413    async fn read(
414        &self,
415        request: Request<umadb_proto::v1::ReadRequest>,
416    ) -> Result<Response<Self::ReadStream>, Status> {
417        // Enforce API key if configured
418        self.enforce_api_key(request.metadata())?;
419        let read_request = request.into_inner();
420
421        // Convert protobuf query to DCB types
422        let mut query: Option<DcbQuery> = read_request.query.map(|q| q.into());
423        let start = read_request.start;
424        let backwards = read_request.backwards.unwrap_or(false);
425        let limit = read_request.limit;
426        // Cap requested batch size.
427        let batch_size = read_request
428            .batch_size
429            .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
430            .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
431        let subscribe = read_request.subscribe.unwrap_or(false);
432
433        // Create a channel for streaming responses (deeper buffer to reduce backpressure under concurrency)
434        let (tx, rx) = mpsc::channel(2048);
435        // Clone the request handler.
436        let request_handler = self.request_handler.clone();
437        // Clone the shutdown watch receiver.
438        let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
439
440        // Spawn a task to handle the read operation and stream multiple batches
441        tokio::spawn(async move {
442            // Ensure we can reuse the same query across batches
443            let query_clone = query.take();
444            let mut next_start = start;
445            let mut sent_any = false;
446            let mut remaining_limit = limit.unwrap_or(u32::MAX);
447            // Create a watch receiver for head updates (for subscriptions)
448            // TODO: Make this an Option and only do this for subscriptions?
449            let mut head_rx = request_handler.watch_head();
450            // If non-subscription read, capture head to preserve point-in-time semantics
451            let captured_head = if !subscribe {
452                request_handler.head().await.unwrap_or(None)
453            } else {
454                None
455            };
456            loop {
457                // If this is a subscription, exit if the client
458                // has gone away or the server is shutting down.
459                if subscribe {
460                    if tx.is_closed() {
461                        break;
462                    }
463                    if *shutdown_watch_rx.borrow() {
464                        break;
465                    }
466                }
467                // Determine per-iteration limit.
468                let read_limit = remaining_limit.min(batch_size);
469                // If subscription and remaining exhausted (limit reached), terminate
470                if subscribe && limit.is_some() && remaining_limit == 0 {
471                    break;
472                }
473                match request_handler
474                    .read(query_clone.clone(), next_start, backwards, Some(read_limit))
475                    .await
476                {
477                    Ok((dcb_sequenced_events, head)) => {
478                        // Capture the original length before consuming events
479                        let original_len = dcb_sequenced_events.len();
480
481                        // Filter and map events, discarding those with position > captured_head
482                        let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
483                            dcb_sequenced_events
484                                .into_iter()
485                                .filter(|e| {
486                                    if let Some(h) = captured_head {
487                                        e.position <= h
488                                    } else {
489                                        true
490                                    }
491                                })
492                                .map(umadb_proto::v1::SequencedEvent::from)
493                                .collect();
494
495                        let reached_captured_head = if captured_head.is_some() {
496                            // Check if we filtered out any events
497                            sequenced_event_protos.len() < original_len
498                        } else {
499                            false
500                        };
501
502                        // Calculate head to send based on context
503                        // For subscriptions: use current head
504                        // For unlimited non-subscription reads: use captured_head
505                        // For limited reads: use last event position (or current head if empty)
506                        let last_event_position = sequenced_event_protos.last().map(|e| e.position);
507                        let head_to_send = if subscribe {
508                            head
509                        } else if limit.is_none() {
510                            captured_head
511                        } else {
512                            last_event_position.or(head)
513                        };
514
515                        if sequenced_event_protos.is_empty() {
516                            // Only send an empty response to communicate head if this is the first
517                            if !sent_any {
518                                let response = umadb_proto::v1::ReadResponse {
519                                    events: vec![],
520                                    head: head_to_send,
521                                };
522                                let _ = tx.send(Ok(response)).await;
523                            }
524                            // For subscriptions, wait for new events instead of terminating
525                            if subscribe {
526                                // Wait for either a new head or a server shutdown signal
527                                tokio::select! {
528                                    _ = head_rx.changed() => {},
529                                    _ = shutdown_watch_rx.changed() => {},
530                                    _ = tx.closed() => {},
531                                }
532                                continue;
533                            }
534                            break;
535                        }
536
537                        // Capture values needed after sequenced_event_protos is moved
538                        let sent_count = sequenced_event_protos.len() as u32;
539
540                        let response = umadb_proto::v1::ReadResponse {
541                            events: sequenced_event_protos,
542                            head: head_to_send,
543                        };
544
545                        if tx.send(Ok(response)).await.is_err() {
546                            break;
547                        }
548                        sent_any = true;
549
550                        // Advance the cursor (use a new reader on the next loop iteration)
551                        next_start =
552                            last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
553
554                        // Stop streaming further if we reached the
555                        // captured head boundary (non-subscriber only).
556                        if reached_captured_head && !subscribe {
557                            break;
558                        }
559
560                        // Decrease the remaining overall limit if any, and stop if reached
561                        if limit.is_some() {
562                            if remaining_limit <= sent_count {
563                                remaining_limit = 0;
564                            } else {
565                                remaining_limit -= sent_count;
566                            }
567                            if remaining_limit == 0 {
568                                break;
569                            }
570                        }
571
572                        // Yield to let other tasks progress under high concurrency
573                        tokio::task::yield_now().await;
574                    }
575                    Err(e) => {
576                        let _ = tx.send(Err(status_from_dcb_error(e))).await;
577                        break;
578                    }
579                }
580            }
581        });
582
583        // Return the receiver as a stream
584        Ok(Response::new(
585            Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
586        ))
587    }
588
589    async fn subscribe(
590        &self,
591        request: Request<umadb_proto::v1::SubscribeRequest>,
592    ) -> Result<Response<Self::SubscribeStream>, Status> {
593        // Enforce API key if configured
594        self.enforce_api_key(request.metadata())?;
595        let subscribe_request = request.into_inner();
596
597        // Convert protobuf query to DCB types
598        let mut query: Option<DcbQuery> = subscribe_request.query.map(|q| q.into());
599        let after = subscribe_request.after;
600        // Cap requested batch size.
601        let batch_size = subscribe_request
602            .batch_size
603            .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
604            .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
605
606        // Create a channel for streaming responses
607        let (tx, rx) = mpsc::channel(2048);
608        // Clone the request handler.
609        let request_handler = self.request_handler.clone();
610        // Clone the shutdown watch receiver.
611        let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
612
613        // Spawn a task to handle the subscribe operation and stream multiple batches
614        tokio::spawn(async move {
615            // Ensure we can reuse the same query across batches
616            let query_clone = query.take();
617            // Todo: End the subscription if after is Some(u64:MAX).
618            let mut next_after = after.map(|a| a.saturating_add(1));
619            // Create a watch receiver for head updates
620            let mut head_rx = request_handler.watch_head();
621
622            loop {
623                // Exit if the client has gone away or the server is shutting down.
624                if tx.is_closed() {
625                    break;
626                }
627                if *shutdown_watch_rx.borrow() {
628                    break;
629                }
630
631                // Always read forward with the requested batch size
632                match request_handler
633                    .read(query_clone.clone(), next_after, false, Some(batch_size))
634                    .await
635                {
636                    Ok((dcb_sequenced_events, _head)) => {
637                        // Map events to protobuf type
638                        let sequenced_event_protos: Vec<umadb_proto::v1::SequencedEvent> =
639                            dcb_sequenced_events
640                                .into_iter()
641                                .map(umadb_proto::v1::SequencedEvent::from)
642                                .collect();
643
644                        if sequenced_event_protos.is_empty() {
645                            // For subscriptions, wait for new events instead of terminating
646                            tokio::select! {
647                                _ = head_rx.changed() => {},
648                                _ = shutdown_watch_rx.changed() => {},
649                                _ = tx.closed() => {},
650                            }
651                            continue;
652                        }
653
654                        let last_event_position = sequenced_event_protos.last().map(|e| e.position);
655
656                        let response = umadb_proto::v1::SubscribeResponse {
657                            events: sequenced_event_protos,
658                        };
659
660                        if tx.send(Ok(response)).await.is_err() {
661                            break;
662                        }
663
664                        // Advance the cursor (use a new reader on the next loop iteration)
665                        // Todo: End the subscription if last_event_position is Some(u64:MAX).
666                        next_after = last_event_position.map(|p| p.saturating_add(1));
667
668                        // Yield to let other tasks progress under high concurrency
669                        tokio::task::yield_now().await;
670                    }
671                    Err(e) => {
672                        let _ = tx.send(Err(status_from_dcb_error(e))).await;
673                        break;
674                    }
675                }
676            }
677        });
678
679        // Return the receiver as a stream
680        Ok(Response::new(
681            Box::pin(ReceiverStream::new(rx)) as Self::SubscribeStream
682        ))
683    }
684
685    async fn append(
686        &self,
687        request: Request<umadb_proto::v1::AppendRequest>,
688    ) -> Result<Response<umadb_proto::v1::AppendResponse>, Status> {
689        // Enforce API key if configured
690        self.enforce_api_key(request.metadata())?;
691        let req = request.into_inner();
692
693        // Convert protobuf types to API types
694        let events: Vec<DcbEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
695            Ok(events) => events,
696            Err(e) => {
697                return Err(status_from_dcb_error(e));
698            }
699        };
700        let condition = req.condition.map(|c| c.into());
701
702        // Call the event store append method
703        match self
704            .request_handler
705            .append(
706                events,
707                condition,
708                req.tracking_info.map(|t| TrackingInfo {
709                    source: t.source,
710                    position: t.position,
711                }),
712            )
713            .await
714        {
715            Ok(position) => Ok(Response::new(umadb_proto::v1::AppendResponse { position })),
716            Err(e) => Err(status_from_dcb_error(e)),
717        }
718    }
719
720    async fn head(
721        &self,
722        request: Request<umadb_proto::v1::HeadRequest>,
723    ) -> Result<Response<umadb_proto::v1::HeadResponse>, Status> {
724        // Enforce API key if configured
725        self.enforce_api_key(request.metadata())?;
726        // Call the event store head method
727        match self.request_handler.head().await {
728            Ok(position) => {
729                // Return the position as a response
730                Ok(Response::new(umadb_proto::v1::HeadResponse { position }))
731            }
732            Err(e) => Err(status_from_dcb_error(e)),
733        }
734    }
735
736    async fn get_tracking_info(
737        &self,
738        request: Request<umadb_proto::v1::TrackingRequest>,
739    ) -> Result<Response<umadb_proto::v1::TrackingResponse>, Status> {
740        // Enforce API key if configured
741        self.enforce_api_key(request.metadata())?;
742        let req = request.into_inner();
743        match self.request_handler.get_tracking_info(req.source).await {
744            Ok(position) => Ok(Response::new(umadb_proto::v1::TrackingResponse {
745                position,
746            })),
747            Err(e) => Err(status_from_dcb_error(e)),
748        }
749    }
750}
751
752// Message types for communication between the gRPC server and the request handler's writer thread
753enum WriterRequest {
754    Append {
755        events: Vec<DcbEvent>,
756        condition: Option<DcbAppendCondition>,
757        tracking_info: Option<TrackingInfo>,
758        response_tx: oneshot::Sender<DcbResult<u64>>,
759    },
760    Shutdown,
761}
762
763// Thread-safe request handler
764struct RequestHandler {
765    mvcc: Arc<Mvcc>,
766    head_watch_tx: watch::Sender<Option<u64>>,
767    writer_request_tx: mpsc::Sender<WriterRequest>,
768}
769
770impl RequestHandler {
771    fn new<P: AsRef<Path> + Send + 'static>(path: P) -> DcbResult<Self> {
772        // Create a channel for sending requests to the writer thread
773        let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
774
775        // Build a shared Mvcc instance (Arc) upfront so reads can proceed concurrently
776        let p = path.as_ref();
777        let file_path = if p.is_dir() {
778            p.join(DEFAULT_DB_FILENAME)
779        } else {
780            p.to_path_buf()
781        };
782        let mvcc = Arc::new(Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)?);
783
784        // Initialize the head watch channel with the current head.
785        let init_head = {
786            let (_, header) = mvcc.get_latest_header()?;
787            let last = header.next_position.0.saturating_sub(1);
788            if last == 0 { None } else { Some(last) }
789        };
790        let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
791
792        // Spawn a thread for processing writer requests.
793        let mvcc_for_writer = mvcc.clone();
794        let head_tx_writer = head_tx.clone();
795        thread::spawn(move || {
796            let db = UmaDb::from_arc(mvcc_for_writer);
797
798            // Create a runtime for processing writer requests.
799            let rt = Runtime::new().unwrap();
800
801            // Process writer requests.
802            rt.block_on(async {
803                while let Some(request) = request_rx.recv().await {
804                    match request {
805                        WriterRequest::Append {
806                            events,
807                            condition,
808                            tracking_info,
809                            response_tx,
810                        } => {
811                            // Batch processing: drain any immediately available requests
812                            // let mut items: Vec<(Vec<DCBEvent>, Option<DCBAppendCondition>)> =
813                            //     Vec::new();
814
815                            let mut total_events = 0;
816                            total_events += events.len();
817                            // items.push((events, condition));
818
819                            let mvcc = &db.mvcc;
820                            let mut writer = match mvcc.writer() {
821                                Ok(writer) => writer,
822                                Err(err) => {
823                                    let _ = response_tx.send(Err(err));
824                                    continue;
825                                }
826                            };
827
828                            let mut responders: Vec<oneshot::Sender<DcbResult<u64>>> = Vec::new();
829                            let mut results: Vec<DcbResult<u64>> = Vec::new();
830
831                            // Track abort state for non-integrity error within the batch
832                            let mut abort_idx: Option<usize> = None;
833                            let mut abort_err: Option<DcbError> = None;
834
835                            responders.push(response_tx);
836                            let result = UmaDb::process_append_request(
837                                events,
838                                condition,
839                                tracking_info,
840                                mvcc,
841                                &mut writer,
842                            );
843                            // Record result and possibly mark abort
844                            match &result {
845                                Ok(_) => results.push(result),
846                                Err(e) if is_integrity_error(e) => {
847                                    results.push(Err(clone_dcb_error(e)))
848                                }
849                                Err(e) => {
850                                    abort_idx = Some(0);
851                                    abort_err = Some(clone_dcb_error(e));
852                                    results.push(Err(clone_dcb_error(e)));
853                                }
854                            }
855
856                            // Drain the channel for more pending writer requests without awaiting.
857                            // Important: do not drop a popped request when hitting the batch limit.
858                            // We stop draining BEFORE attempting to recv if we've reached the limit.
859                            loop {
860                                if total_events >= APPEND_BATCH_MAX_EVENTS {
861                                    break;
862                                }
863                                // Stop draining if we've already decided to abort
864                                if abort_idx.is_some() {
865                                    break;
866                                }
867                                match request_rx.try_recv() {
868                                    Ok(WriterRequest::Append {
869                                        events,
870                                        condition,
871                                        tracking_info,
872                                        response_tx,
873                                    }) => {
874                                        let ev_len = events.len();
875                                        let idx_in_batch = responders.len();
876                                        responders.push(response_tx);
877                                        let res_next = UmaDb::process_append_request(
878                                            events,
879                                            condition,
880                                            tracking_info,
881                                            mvcc,
882                                            &mut writer,
883                                        );
884                                        match &res_next {
885                                            Ok(_) => results.push(res_next),
886                                            Err(e) if is_integrity_error(e) => {
887                                                results.push(Err(clone_dcb_error(e)))
888                                            }
889                                            Err(e) => {
890                                                abort_idx = Some(idx_in_batch);
891                                                abort_err = Some(clone_dcb_error(e));
892                                                results.push(Err(clone_dcb_error(e)));
893                                                // Do not accumulate more into the batch
894                                            }
895                                        }
896                                        total_events += ev_len;
897                                    }
898                                    Ok(WriterRequest::Shutdown) => {
899                                        // Push back the shutdown signal by breaking and letting
900                                        // outer loop handle after batch. We'll process the
901                                        // current batch first, then break the outer loop on
902                                        // the next iteration when the channel is empty.
903                                        break;
904                                    }
905                                    Err(mpsc::error::TryRecvError::Empty) => {
906                                        break;
907                                    }
908                                    Err(mpsc::error::TryRecvError::Disconnected) => break,
909                                }
910                            }
911                            // println!("Total events: {total_events}");
912
913                            if let (Some(failed_at), Some(orig_err)) = (abort_idx, abort_err) {
914                                // Abort batch: skip commit; respond to all items in this batch
915                                let shadow = shadow_for_batch_abort(&orig_err);
916                                for (i, tx) in responders.into_iter().enumerate() {
917                                    if i == failed_at {
918                                        let _ = tx.send(Err(clone_dcb_error(&orig_err)));
919                                    } else {
920                                        let _ = tx.send(Err(clone_dcb_error(&shadow)));
921                                    }
922                                }
923                                // Do not update head, since nothing was committed
924                                continue;
925                            }
926
927                            // Single commit at the end of the batch
928                            let batch_result = match mvcc.commit(&mut writer) {
929                                Ok(_) => Ok(results),
930                                Err(err) => Err(err),
931                            };
932
933                            match batch_result {
934                                Ok(results) => {
935                                    // Send individual results back to requesters
936                                    for (res, tx) in results.into_iter().zip(responders.into_iter())
937                                    {
938                                        let _ = tx.send(res);
939                                    }
940                                    // After a successful batch commit, publish the updated head from writer.next_position.
941                                    let last_committed = writer.next_position.0.saturating_sub(1);
942                                    let new_head = if last_committed == 0 {
943                                        None
944                                    } else {
945                                        Some(last_committed)
946                                    };
947                                    let _ = head_tx_writer.send(new_head);
948                                }
949                                Err(e) => {
950                                    // If the batch failed as a whole (e.g., commit failed), propagate the SAME error to all responders.
951                                    // DCBError is not Clone (contains io::Error), so reconstruct a best-effort copy by using its Display text
952                                    // for Io and cloning data for other variants.
953                                    let total = responders.len();
954                                    let mut iter = responders.into_iter();
955                                    for _ in 0..total {
956                                        if let Some(tx) = iter.next() {
957                                            let _ = tx.send(Err(clone_dcb_error(&e)));
958                                        }
959                                    }
960                                }
961                            }
962                        }
963                        WriterRequest::Shutdown => {
964                            break;
965                        }
966                    }
967                }
968            });
969        });
970
971        Ok(Self {
972            mvcc,
973            head_watch_tx: head_tx,
974            writer_request_tx: request_tx,
975        })
976    }
977
978    async fn read(
979        &self,
980        query: Option<DcbQuery>,
981        start: Option<u64>,
982        backwards: bool,
983        limit: Option<u32>,
984    ) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
985        let reader = self.mvcc.reader()?;
986        let last_committed_position = reader.next_position.0.saturating_sub(1);
987
988        let q = query.unwrap_or(DcbQuery { items: vec![] });
989        let start_position = start.map(Position);
990
991        let events = read_conditional(
992            &self.mvcc,
993            &std::collections::HashMap::new(),
994            reader.events_tree_root_id,
995            reader.tags_tree_root_id,
996            q,
997            start_position,
998            backwards,
999            limit,
1000            false,
1001        )
1002        .map_err(|e| DcbError::Corruption(format!("{e}")))?;
1003
1004        let head = if limit.is_none() {
1005            if last_committed_position == 0 {
1006                None
1007            } else {
1008                Some(last_committed_position)
1009            }
1010        } else {
1011            events.last().map(|e| e.position)
1012        };
1013
1014        Ok((events, head))
1015    }
1016
1017    async fn head(&self) -> DcbResult<Option<u64>> {
1018        let (_, header) = self
1019            .mvcc
1020            .get_latest_header()
1021            .map_err(|e| DcbError::Corruption(format!("{e}")))?;
1022        let last = header.next_position.0.saturating_sub(1);
1023        if last == 0 { Ok(None) } else { Ok(Some(last)) }
1024    }
1025
1026    async fn get_tracking_info(&self, source: String) -> DcbResult<Option<u64>> {
1027        let db = UmaDb::from_arc(self.mvcc.clone());
1028        db.get_tracking_info(&source)
1029    }
1030
1031    pub async fn append(
1032        &self,
1033        events: Vec<DcbEvent>,
1034        condition: Option<DcbAppendCondition>,
1035        tracking_info: Option<TrackingInfo>,
1036    ) -> DcbResult<u64> {
1037        // Concurrent pre-check of the given condition using a reader in a blocking thread.
1038        let pre_append_decision = if let Some(mut given_condition) = condition {
1039            let reader = self.mvcc.reader()?;
1040            let current_head = {
1041                let last = reader.next_position.0.saturating_sub(1);
1042                if last == 0 { None } else { Some(last) }
1043            };
1044
1045            // Perform conditional read on the snapshot (limit 1) starting after the given position
1046            let from = given_condition.after.map(|after| Position(after + 1));
1047            let empty_dirty = std::collections::HashMap::new();
1048            let found = read_conditional(
1049                &self.mvcc,
1050                &empty_dirty,
1051                reader.events_tree_root_id,
1052                reader.tags_tree_root_id,
1053                given_condition.fail_if_events_match.clone(),
1054                from,
1055                false,
1056                Some(1),
1057                false,
1058            )?;
1059
1060            if let Some(matched) = found.first() {
1061                // Found one event — consider if the request is idempotent...
1062                match is_request_idempotent(
1063                    &self.mvcc,
1064                    &empty_dirty,
1065                    reader.events_tree_root_id,
1066                    reader.tags_tree_root_id,
1067                    &events,
1068                    given_condition.fail_if_events_match.clone(),
1069                    from,
1070                ) {
1071                    Ok(Some(last_recorded_position)) => {
1072                        // Request is idempotent; skip actual append
1073                        PreAppendDecision::AlreadyAppended(last_recorded_position)
1074                    }
1075                    Ok(None) => {
1076                        // Integrity violation
1077                        let msg = format!(
1078                            "condition: {:?} matched: {:?}",
1079                            given_condition.clone(),
1080                            matched,
1081                        );
1082                        return Err(DcbError::IntegrityError(msg));
1083                    }
1084                    Err(err) => {
1085                        // Propagate underlying read error
1086                        return Err(err);
1087                    }
1088                }
1089            } else {
1090                // No match found: we can advance 'after' to the current head observed by this reader
1091                let new_after = std::cmp::max(
1092                    given_condition.after.unwrap_or(0),
1093                    current_head.unwrap_or(0),
1094                );
1095                given_condition.after = Some(new_after);
1096
1097                PreAppendDecision::UseCondition(Some(given_condition))
1098            }
1099        } else {
1100            // No condition provided at all
1101            PreAppendDecision::UseCondition(None)
1102        };
1103
1104        // Handle the pre-check decision
1105        match pre_append_decision {
1106            PreAppendDecision::AlreadyAppended(last_found_position) => {
1107                // ✅ Request was idempotent — just return the existing position.
1108                Ok(last_found_position)
1109            }
1110            PreAppendDecision::UseCondition(adjusted_condition) => {
1111                // ✅ Proceed with append operation on the writer thread.
1112                let (response_tx, response_rx) = oneshot::channel();
1113
1114                self.writer_request_tx
1115                    .send(WriterRequest::Append {
1116                        events,
1117                        condition: adjusted_condition,
1118                        tracking_info,
1119                        response_tx,
1120                    })
1121                    .await
1122                    .map_err(|_| {
1123                        DcbError::Io(std::io::Error::other(
1124                            "failed to send append request to EventStore thread",
1125                        ))
1126                    })?;
1127
1128                response_rx.await.map_err(|_| {
1129                    DcbError::Io(std::io::Error::other(
1130                        "failed to receive append response from EventStore thread",
1131                    ))
1132                })?
1133            }
1134        }
1135    }
1136
1137    fn watch_head(&self) -> watch::Receiver<Option<u64>> {
1138        self.head_watch_tx.subscribe()
1139    }
1140
1141    #[allow(dead_code)]
1142    async fn shutdown(&self) {
1143        let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
1144    }
1145}
1146
1147// Clone implementation for EventStoreHandle
1148impl Clone for RequestHandler {
1149    fn clone(&self) -> Self {
1150        Self {
1151            mvcc: self.mvcc.clone(),
1152            head_watch_tx: self.head_watch_tx.clone(),
1153            writer_request_tx: self.writer_request_tx.clone(),
1154        }
1155    }
1156}
1157
1158#[derive(Debug)]
1159enum PreAppendDecision {
1160    /// Proceed with this (possibly adjusted) condition
1161    UseCondition(Option<DcbAppendCondition>),
1162    /// Skip append operation because the request was idempotent; return last recorded position
1163    AlreadyAppended(u64),
1164}