umadb_server/
lib.rs

1use futures::Stream;
2use std::fs;
3use std::path::Path;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::thread;
7use tokio::sync::{mpsc, oneshot, watch};
8use tokio_stream::wrappers::ReceiverStream;
9use tonic::transport::{Identity, ServerTlsConfig};
10use tonic::{Request, Response, Status, transport::Server};
11
12use umadb_core::db::{
13    DEFAULT_DB_FILENAME, DEFAULT_PAGE_SIZE, UmaDB, is_request_idempotent, read_conditional,
14};
15use umadb_core::mvcc::Mvcc;
16use umadb_dcb::{DCBAppendCondition, DCBError, DCBEvent, DCBQuery, DCBResult, DCBSequencedEvent};
17
18use tokio::runtime::Runtime;
19use umadb_core::common::Position;
20use umadb_proto::{
21    AppendRequestProto, AppendResponseProto, HeadRequestProto, HeadResponseProto, ReadRequestProto,
22    ReadResponseProto, SequencedEventProto, UmaDbService, UmaDbServiceServer,
23    status_from_dcb_error,
24};
25
26const APPEND_BATCH_MAX_EVENTS: usize = 2000;
27const READ_RESPONSE_BATCH_SIZE_DEFAULT: u32 = 100;
28const READ_RESPONSE_BATCH_SIZE_MAX: u32 = 5000;
29
30// Optional TLS configuration helpers
31#[derive(Clone, Debug)]
32pub struct ServerTlsOptions {
33    pub cert_pem: Vec<u8>,
34    pub key_pem: Vec<u8>,
35}
36
37fn build_server_builder_with_options(tls: Option<ServerTlsOptions>) -> Server {
38    use std::time::Duration;
39    let mut server_builder = Server::builder()
40        .http2_keepalive_interval(Some(Duration::from_secs(5)))
41        .http2_keepalive_timeout(Some(Duration::from_secs(10)))
42        .initial_stream_window_size(Some(4 * 1024 * 1024))
43        .initial_connection_window_size(Some(8 * 1024 * 1024))
44        .tcp_nodelay(true)
45        .concurrency_limit_per_connection(1024);
46
47    if let Some(opts) = tls {
48        let identity = Identity::from_pem(opts.cert_pem, opts.key_pem);
49        server_builder = server_builder
50            .tls_config(ServerTlsConfig::new().identity(identity))
51            .expect("failed to apply TLS config");
52    }
53
54    server_builder
55}
56
57// Function to start the gRPC server with a shutdown signal
58pub async fn start_server<P: AsRef<Path> + Send + 'static>(
59    path: P,
60    addr: &str,
61    shutdown_rx: oneshot::Receiver<()>,
62) -> Result<(), Box<dyn std::error::Error>> {
63    start_server_internal(path, addr, shutdown_rx, None).await
64}
65
66/// Start server with TLS using PEM-encoded cert and key.
67pub async fn start_server_secure<P: AsRef<Path> + Send + 'static>(
68    path: P,
69    addr: &str,
70    shutdown_rx: oneshot::Receiver<()>,
71    cert_pem: Vec<u8>,
72    key_pem: Vec<u8>,
73) -> Result<(), Box<dyn std::error::Error>> {
74    let tls = ServerTlsOptions { cert_pem, key_pem };
75    start_server_internal(path, addr, shutdown_rx, Some(tls)).await
76}
77
78/// Convenience: load cert and key from filesystem paths
79pub async fn start_server_secure_from_files<
80    P: AsRef<Path> + Send + 'static,
81    CP: AsRef<Path>,
82    KP: AsRef<Path>,
83>(
84    path: P,
85    addr: &str,
86    shutdown_rx: oneshot::Receiver<()>,
87    cert_path: CP,
88    key_path: KP,
89) -> Result<(), Box<dyn std::error::Error>> {
90    let cert_path_ref = cert_path.as_ref();
91    let cert_pem = fs::read(cert_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
92        format!(
93            "Failed to open TLS certificate file '{}': {}",
94            cert_path_ref.display(),
95            e
96        )
97        .into()
98    })?;
99
100    let key_path_ref = key_path.as_ref();
101    let key_pem = fs::read(key_path_ref).map_err(|e| -> Box<dyn std::error::Error> {
102        format!(
103            "Failed to open TLS key file '{}': {}",
104            key_path_ref.display(),
105            e
106        )
107        .into()
108    })?;
109    start_server_secure(path, addr, shutdown_rx, cert_pem, key_pem).await
110}
111
112async fn start_server_internal<P: AsRef<Path> + Send + 'static>(
113    path: P,
114    addr: &str,
115    shutdown_rx: oneshot::Receiver<()>,
116    tls: Option<ServerTlsOptions>,
117) -> Result<(), Box<dyn std::error::Error>> {
118    let addr = addr.parse()?;
119    // Create a shutdown broadcast channel for terminating ongoing subscriptions
120    let (srv_shutdown_tx, srv_shutdown_rx) = watch::channel(false);
121    let server = UmaDBServer::new(path, srv_shutdown_rx)?;
122    if tls.is_some() {
123        println!("Started UmaDB server (with TLS) listening on {addr}");
124    } else {
125        println!("UmaDB server (insecure) listening on {addr}");
126    }
127
128    let mut server_builder = build_server_builder_with_options(tls);
129
130    // gRPC Health service setup
131    use tonic_health::ServingStatus; // server API expects this enum
132    let (health_reporter, health_service) = tonic_health::server::health_reporter();
133    // Set overall and service-specific health to SERVING
134    health_reporter
135        .set_service_status("", ServingStatus::Serving)
136        .await;
137    health_reporter
138        .set_service_status("umadb.UmaDBService", ServingStatus::Serving)
139        .await;
140    let health_reporter_for_shutdown = health_reporter.clone();
141
142    server_builder
143        .add_service(health_service)
144        .add_service(server.into_service())
145        .serve_with_shutdown(addr, async move {
146            // Wait for an external shutdown trigger
147            let _ = shutdown_rx.await;
148            // Mark health as NOT_SERVING before shutdown
149            let _ = health_reporter_for_shutdown
150                .set_service_status("", ServingStatus::NotServing)
151                .await;
152            let _ = health_reporter_for_shutdown
153                .set_service_status("umadb.UmaDBService", ServingStatus::NotServing)
154                .await;
155            // Broadcast shutdown to all subscription tasks
156            let _ = srv_shutdown_tx.send(true);
157            println!("UmaDB server shutdown complete");
158        })
159        .await?;
160
161    Ok(())
162}
163
164// gRPC server implementation
165pub struct UmaDBServer {
166    request_handler: RequestHandler,
167    shutdown_watch_rx: watch::Receiver<bool>,
168}
169
170impl UmaDBServer {
171    pub fn new<P: AsRef<Path> + Send + 'static>(
172        path: P,
173        shutdown_rx: watch::Receiver<bool>,
174    ) -> std::io::Result<Self> {
175        let command_handler = RequestHandler::new(path)?;
176        Ok(Self {
177            request_handler: command_handler,
178            shutdown_watch_rx: shutdown_rx,
179        })
180    }
181
182    pub fn into_service(self) -> UmaDbServiceServer<Self> {
183        UmaDbServiceServer::new(self)
184    }
185}
186
187#[tonic::async_trait]
188impl UmaDbService for UmaDBServer {
189    type ReadStream =
190        Pin<Box<dyn Stream<Item = Result<ReadResponseProto, Status>> + Send + 'static>>;
191
192    async fn read(
193        &self,
194        request: Request<ReadRequestProto>,
195    ) -> Result<Response<Self::ReadStream>, Status> {
196        let read_request = request.into_inner();
197
198        // Convert protobuf query to DCB types
199        let mut query: Option<DCBQuery> = read_request.query.map(|q| q.into());
200        let start = read_request.start;
201        let backwards = read_request.backwards.unwrap_or(false);
202        let limit = read_request.limit;
203        // Cap requested batch size.
204        let batch_size = read_request
205            .batch_size
206            .unwrap_or(READ_RESPONSE_BATCH_SIZE_DEFAULT)
207            .clamp(1, READ_RESPONSE_BATCH_SIZE_MAX);
208        let subscribe = read_request.subscribe.unwrap_or(false);
209
210        // Create a channel for streaming responses (deeper buffer to reduce backpressure under concurrency)
211        let (tx, rx) = mpsc::channel(2048);
212        // Clone the request handler.
213        let request_handler = self.request_handler.clone();
214        // Clone the shutdown watch receiver.
215        let mut shutdown_watch_rx = self.shutdown_watch_rx.clone();
216
217        // Spawn a task to handle the read operation and stream multiple batches
218        tokio::spawn(async move {
219            // Ensure we can reuse the same query across batches
220            let query_clone = query.take();
221            let mut next_start = start;
222            let mut sent_any = false;
223            let mut remaining_limit = limit.unwrap_or(u32::MAX);
224            // Create a watch receiver for head updates (for subscriptions)
225            // TODO: Make this an Option and only do this for subscriptions?
226            let mut head_rx = request_handler.watch_head();
227            // If non-subscription read, capture head to preserve point-in-time semantics
228            let captured_head = if !subscribe {
229                request_handler.head().await.unwrap_or(None)
230            } else {
231                None
232            };
233            loop {
234                // If this is a subscription, exit if the client
235                // has gone away or the server is shutting down.
236                if subscribe {
237                    if tx.is_closed() {
238                        break;
239                    }
240                    if *shutdown_watch_rx.borrow() {
241                        break;
242                    }
243                }
244                // Determine per-iteration limit.
245                let read_limit = remaining_limit.min(batch_size);
246                // If subscription and remaining exhausted (limit reached), terminate
247                if subscribe && limit.is_some() && remaining_limit == 0 {
248                    break;
249                }
250                match request_handler
251                    .read(query_clone.clone(), next_start, backwards, Some(read_limit))
252                    .await
253                {
254                    Ok((dcb_sequenced_events, head)) => {
255                        // Capture the original length before consuming events
256                        let original_len = dcb_sequenced_events.len();
257
258                        // Filter and map events, discarding those with position > captured_head
259                        let sequenced_event_protos: Vec<SequencedEventProto> = dcb_sequenced_events
260                            .into_iter()
261                            .filter(|e| {
262                                if let Some(h) = captured_head {
263                                    e.position <= h
264                                } else {
265                                    true
266                                }
267                            })
268                            .map(SequencedEventProto::from)
269                            .collect();
270
271                        let reached_captured_head = if captured_head.is_some() {
272                            // Check if we filtered out any events
273                            sequenced_event_protos.len() < original_len
274                        } else {
275                            false
276                        };
277
278                        // Calculate head to send based on context
279                        // For subscriptions: use current head
280                        // For unlimited non-subscription reads: use captured_head
281                        // For limited reads: use last event position (or current head if empty)
282                        let last_event_position = sequenced_event_protos.last().map(|e| e.position);
283                        let head_to_send = if subscribe {
284                            head
285                        } else if limit.is_none() {
286                            captured_head
287                        } else {
288                            last_event_position.or(head)
289                        };
290
291                        if sequenced_event_protos.is_empty() {
292                            // Only send an empty response to communicate head if this is the first
293                            if !sent_any {
294                                let response = ReadResponseProto {
295                                    events: vec![],
296                                    head: head_to_send,
297                                };
298                                let _ = tx.send(Ok(response)).await;
299                            }
300                            // For subscriptions, wait for new events instead of terminating
301                            if subscribe {
302                                // Wait while head <= next_after (or None)
303                                loop {
304                                    // Stop if the channel is closed.
305                                    if tx.is_closed() {
306                                        break;
307                                    }
308                                    let current_head = *head_rx.borrow();
309                                    if current_head
310                                        .map(|h| h >= next_start.unwrap_or(1))
311                                        .unwrap_or(false)
312                                    {
313                                        break; // Break out of waiting, new events are available.
314                                    }
315                                    // Wait for either a new head or a server shutdown signal
316                                    tokio::select! {
317                                        res = head_rx.changed() => {
318                                            if res.is_err() { break; }
319                                        }
320                                        res2 = shutdown_watch_rx.changed() => {
321                                            if res2.is_ok() {
322                                                // Exit if shutting down.
323                                                if *shutdown_watch_rx.borrow() { break; }
324                                            } else {
325                                                break; // sender dropped
326                                            }
327                                        }
328                                    }
329                                }
330                                continue;
331                            }
332                            break;
333                        }
334
335                        // Capture values needed after sequenced_event_protos is moved
336                        let sent_count = sequenced_event_protos.len() as u32;
337
338                        let response = ReadResponseProto {
339                            events: sequenced_event_protos,
340                            head: head_to_send,
341                        };
342
343                        if tx.send(Ok(response)).await.is_err() {
344                            break;
345                        }
346                        sent_any = true;
347
348                        // Advance the cursor (use a new reader on the next loop iteration)
349                        next_start =
350                            last_event_position.map(|p| if !backwards { p + 1 } else { p - 1 });
351
352                        // Stop streaming further if we reached the
353                        // captured head boundary (non-subscriber only).
354                        if reached_captured_head && !subscribe {
355                            break;
356                        }
357
358                        // Decrease the remaining overall limit if any, and stop if reached
359                        if limit.is_some() {
360                            if remaining_limit <= sent_count {
361                                remaining_limit = 0;
362                            } else {
363                                remaining_limit -= sent_count;
364                            }
365                            if remaining_limit == 0 {
366                                break;
367                            }
368                        }
369
370                        // Yield to let other tasks progress under high concurrency
371                        tokio::task::yield_now().await;
372                    }
373                    Err(e) => {
374                        let _ = tx.send(Err(status_from_dcb_error(&e))).await;
375                        break;
376                    }
377                }
378            }
379        });
380
381        // Return the receiver as a stream
382        Ok(Response::new(
383            Box::pin(ReceiverStream::new(rx)) as Self::ReadStream
384        ))
385    }
386
387    async fn append(
388        &self,
389        request: Request<AppendRequestProto>,
390    ) -> Result<Response<AppendResponseProto>, Status> {
391        let req = request.into_inner();
392
393        // Convert protobuf types to API types
394        let events: Vec<DCBEvent> = match req.events.into_iter().map(|e| e.try_into()).collect() {
395            Ok(events) => events,
396            Err(e) => {
397                return Err(status_from_dcb_error(&e));
398            }
399        };
400        let condition = req.condition.map(|c| c.into());
401
402        // Call the event store append method
403        match self.request_handler.append(events, condition).await {
404            Ok(position) => Ok(Response::new(AppendResponseProto { position })),
405            Err(e) => Err(status_from_dcb_error(&e)),
406        }
407    }
408
409    async fn head(
410        &self,
411        _request: Request<HeadRequestProto>,
412    ) -> Result<Response<HeadResponseProto>, Status> {
413        // Call the event store head method
414        match self.request_handler.head().await {
415            Ok(position) => {
416                // Return the position as a response
417                Ok(Response::new(HeadResponseProto { position }))
418            }
419            Err(e) => Err(status_from_dcb_error(&e)),
420        }
421    }
422}
423
424// Message types for communication between the gRPC server and the request handler's writer thread
425enum WriterRequest {
426    Append {
427        events: Vec<DCBEvent>,
428        condition: Option<DCBAppendCondition>,
429        response_tx: oneshot::Sender<DCBResult<u64>>,
430    },
431    Shutdown,
432}
433
434// Thread-safe request handler
435struct RequestHandler {
436    mvcc: Arc<Mvcc>,
437    head_watch_tx: watch::Sender<Option<u64>>,
438    writer_request_tx: mpsc::Sender<WriterRequest>,
439}
440
441impl RequestHandler {
442    fn new<P: AsRef<Path> + Send + 'static>(path: P) -> std::io::Result<Self> {
443        // Create a channel for sending requests to the writer thread
444        let (request_tx, mut request_rx) = mpsc::channel::<WriterRequest>(1024);
445
446        // Build a shared Mvcc instance (Arc) upfront so reads can proceed concurrently
447        let p = path.as_ref();
448        let file_path = if p.is_dir() {
449            p.join(DEFAULT_DB_FILENAME)
450        } else {
451            p.to_path_buf()
452        };
453        let mvcc = Arc::new(
454            Mvcc::new(&file_path, DEFAULT_PAGE_SIZE, false)
455                .map_err(|e| std::io::Error::other(format!("Failed to init LMDB: {e:?}")))?,
456        );
457
458        // Initialize the head watch channel with the current head.
459        let init_head = {
460            let (_, header) = mvcc
461                .get_latest_header()
462                .map_err(|e| std::io::Error::other(format!("Failed to read header: {e:?}")))?;
463            let last = header.next_position.0.saturating_sub(1);
464            if last == 0 { None } else { Some(last) }
465        };
466        let (head_tx, _head_rx) = watch::channel::<Option<u64>>(init_head);
467
468        // Spawn a thread for processing writer requests.
469        let mvcc_for_writer = mvcc.clone();
470        let head_tx_writer = head_tx.clone();
471        thread::spawn(move || {
472            let db = UmaDB::from_arc(mvcc_for_writer);
473
474            // Create a runtime for processing writer requests.
475            let rt = Runtime::new().unwrap();
476
477            // Process writer requests.
478            rt.block_on(async {
479                while let Some(request) = request_rx.recv().await {
480                    match request {
481                        WriterRequest::Append {
482                            events,
483                            condition,
484                            response_tx,
485                        } => {
486                            // Batch processing: drain any immediately available requests
487                            // let mut items: Vec<(Vec<DCBEvent>, Option<DCBAppendCondition>)> =
488                            //     Vec::new();
489                            let mut responders: Vec<oneshot::Sender<DCBResult<u64>>> = Vec::new();
490
491                            let mut total_events = 0;
492                            total_events += events.len();
493                            // items.push((events, condition));
494
495                            let mvcc = &db.mvcc;
496                            let mut writer = match mvcc.writer() {
497                                Ok(writer) => writer,
498                                Err(err) => {
499                                    let _ = response_tx.send(Err(err));
500                                    continue;
501                                }
502                            };
503                            responders.push(response_tx);
504
505                            let mut results: Vec<DCBResult<u64>> = Vec::new();
506
507                            UmaDB::process_append_request(
508                                events,
509                                condition,
510                                false,
511                                mvcc,
512                                &mut writer,
513                                &mut results,
514                            );
515
516                            // Drain the channel for more pending writer requests without awaiting.
517                            // Important: do not drop a popped request when hitting the batch limit.
518                            // We stop draining BEFORE attempting to recv if we've reached the limit.
519                            loop {
520                                if total_events >= APPEND_BATCH_MAX_EVENTS {
521                                    break;
522                                }
523                                match request_rx.try_recv() {
524                                    Ok(WriterRequest::Append {
525                                        events,
526                                        condition,
527                                        response_tx,
528                                    }) => {
529                                        let ev_len = events.len();
530                                        UmaDB::process_append_request(
531                                            events,
532                                            condition,
533                                            false,
534                                            mvcc,
535                                            &mut writer,
536                                            &mut results,
537                                        );
538                                        responders.push(response_tx);
539                                        total_events += ev_len;
540                                    }
541                                    Ok(WriterRequest::Shutdown) => {
542                                        // Push back the shutdown signal by breaking and letting
543                                        // outer loop handle after batch. We'll process the
544                                        // current batch first, then break the outer loop on
545                                        // the next iteration when the channel is empty.
546                                        break;
547                                    }
548                                    Err(mpsc::error::TryRecvError::Empty) => break,
549                                    Err(mpsc::error::TryRecvError::Disconnected) => break,
550                                }
551                            }
552                            // println!("Total events: {total_events}");
553                            // Execute a single batched append operation.
554
555                            // Single commit at the end of the batch
556                            let batch_result = match mvcc.commit(&mut writer) {
557                                Ok(_) => Ok(results),
558                                Err(err) => Err(err),
559                            };
560
561                            // let batch_result = db.append_batch(items, false);
562                            match batch_result {
563                                Ok(results) => {
564                                    // Send individual results back to requesters
565                                    // Also compute the new head as the maximum successful last position in this batch
566                                    let mut max_ok: Option<u64> = None;
567                                    for (res, tx) in results.into_iter().zip(responders.into_iter())
568                                    {
569                                        if let Ok(v) = &res {
570                                            max_ok = Some(max_ok.map_or(*v, |m| m.max(*v)));
571                                        }
572                                        let _ = tx.send(res);
573                                    }
574                                    // After a successful batch commit, publish the updated head.
575                                    if let Some(h) = max_ok {
576                                        let _ = head_tx_writer.send(Some(h));
577                                    }
578                                }
579                                Err(e) => {
580                                    // If the batch failed as a whole (e.g., commit failed), propagate the SAME error to all responders.
581                                    // DCBError is not Clone (contains io::Error), so reconstruct a best-effort copy by using its Display text
582                                    // for Io and cloning data for other variants.
583                                    let total = responders.len();
584                                    let mut iter = responders.into_iter();
585                                    for _ in 0..total {
586                                        if let Some(tx) = iter.next() {
587                                            let _ = tx.send(Err(clone_dcb_error(&e)));
588                                        }
589                                    }
590                                }
591                            }
592                        }
593                        WriterRequest::Shutdown => {
594                            break;
595                        }
596                    }
597                }
598            });
599        });
600
601        Ok(Self {
602            mvcc,
603            head_watch_tx: head_tx,
604            writer_request_tx: request_tx,
605        })
606    }
607
608    async fn read(
609        &self,
610        query: Option<DCBQuery>,
611        start: Option<u64>,
612        backwards: bool,
613        limit: Option<u32>,
614    ) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
615        let reader = self.mvcc.reader()?;
616        let last_committed_position = reader.next_position.0.saturating_sub(1);
617
618        let q = query.unwrap_or(DCBQuery { items: vec![] });
619        let start_position = start.map(Position);
620
621        let events = read_conditional(
622            &self.mvcc,
623            &std::collections::HashMap::new(),
624            reader.events_tree_root_id,
625            reader.tags_tree_root_id,
626            q,
627            start_position,
628            backwards,
629            limit,
630            false,
631        )
632        .map_err(|e| DCBError::Corruption(format!("{e}")))?;
633
634        let head = if limit.is_none() {
635            if last_committed_position == 0 {
636                None
637            } else {
638                Some(last_committed_position)
639            }
640        } else {
641            events.last().map(|e| e.position)
642        };
643
644        Ok((events, head))
645    }
646
647    async fn head(&self) -> DCBResult<Option<u64>> {
648        let (_, header) = self
649            .mvcc
650            .get_latest_header()
651            .map_err(|e| DCBError::Corruption(format!("{e}")))?;
652        let last = header.next_position.0.saturating_sub(1);
653        if last == 0 { Ok(None) } else { Ok(Some(last)) }
654    }
655    pub async fn append(
656        &self,
657        events: Vec<DCBEvent>,
658        condition: Option<DCBAppendCondition>,
659    ) -> DCBResult<u64> {
660        // Concurrent pre-check of the given condition using a reader in a blocking thread.
661        let pre_append_decision = if let Some(mut given_condition) = condition {
662            let reader = self.mvcc.reader()?;
663            let current_head = {
664                let last = reader.next_position.0.saturating_sub(1);
665                if last == 0 { None } else { Some(last) }
666            };
667
668            // Perform conditional read on the snapshot (limit 1) starting after the given position
669            let from = given_condition.after.map(|after| Position(after + 1));
670            let empty_dirty = std::collections::HashMap::new();
671            let found = read_conditional(
672                &self.mvcc,
673                &empty_dirty,
674                reader.events_tree_root_id,
675                reader.tags_tree_root_id,
676                given_condition.fail_if_events_match.clone(),
677                from,
678                false,
679                Some(1),
680                false,
681            )?;
682
683            if let Some(matched) = found.first() {
684                // Found one event — consider if the request is idempotent...
685                match is_request_idempotent(
686                    &self.mvcc,
687                    &empty_dirty,
688                    reader.events_tree_root_id,
689                    reader.tags_tree_root_id,
690                    &events,
691                    given_condition.fail_if_events_match.clone(),
692                    from,
693                ) {
694                    Ok(Some(last_recorded_position)) => {
695                        // Request is idempotent; skip actual append
696                        PreAppendDecision::AlreadyAppended(last_recorded_position)
697                    }
698                    Ok(None) => {
699                        // Integrity violation
700                        let msg = format!(
701                            "condition: {:?} matched: {:?}",
702                            given_condition.clone(),
703                            matched,
704                        );
705                        return Err(DCBError::IntegrityError(msg));
706                    }
707                    Err(err) => {
708                        // Propagate underlying read error
709                        return Err(err);
710                    }
711                }
712            } else {
713                // No match found: we can advance 'after' to the current head observed by this reader
714                let new_after = std::cmp::max(
715                    given_condition.after.unwrap_or(0),
716                    current_head.unwrap_or(0),
717                );
718                given_condition.after = Some(new_after);
719
720                PreAppendDecision::UseCondition(Some(given_condition))
721            }
722        } else {
723            // No condition provided at all
724            PreAppendDecision::UseCondition(None)
725        };
726
727        // Handle the pre-check decision
728        match pre_append_decision {
729            PreAppendDecision::AlreadyAppended(last_found_position) => {
730                // ✅ Request was idempotent — just return the existing position.
731                Ok(last_found_position)
732            }
733            PreAppendDecision::UseCondition(adjusted_condition) => {
734                // ✅ Proceed with append operation on the writer thread.
735                let (response_tx, response_rx) = oneshot::channel();
736
737                self.writer_request_tx
738                    .send(WriterRequest::Append {
739                        events,
740                        condition: adjusted_condition,
741                        response_tx,
742                    })
743                    .await
744                    .map_err(|_| {
745                        DCBError::Io(std::io::Error::other(
746                            "Failed to send append request to EventStore thread",
747                        ))
748                    })?;
749
750                response_rx.await.map_err(|_| {
751                    DCBError::Io(std::io::Error::other(
752                        "Failed to receive append response from EventStore thread",
753                    ))
754                })?
755            }
756        }
757    }
758
759    fn watch_head(&self) -> watch::Receiver<Option<u64>> {
760        self.head_watch_tx.subscribe()
761    }
762
763    #[allow(dead_code)]
764    async fn shutdown(&self) {
765        let _ = self.writer_request_tx.send(WriterRequest::Shutdown).await;
766    }
767}
768
769fn clone_dcb_error(src: &DCBError) -> DCBError {
770    match src {
771        DCBError::Io(err) => DCBError::Io(std::io::Error::other(err.to_string())),
772        DCBError::IntegrityError(s) => DCBError::IntegrityError(s.clone()),
773        DCBError::Corruption(s) => DCBError::Corruption(s.clone()),
774        DCBError::PageNotFound(id) => DCBError::PageNotFound(*id),
775        DCBError::DirtyPageNotFound(id) => DCBError::DirtyPageNotFound(*id),
776        DCBError::RootIDMismatch(old_id, new_id) => DCBError::RootIDMismatch(*old_id, *new_id),
777        DCBError::DatabaseCorrupted(s) => DCBError::DatabaseCorrupted(s.clone()),
778        DCBError::InternalError(s) => DCBError::InternalError(s.clone()),
779        DCBError::SerializationError(s) => DCBError::SerializationError(s.clone()),
780        DCBError::DeserializationError(s) => DCBError::DeserializationError(s.clone()),
781        DCBError::PageAlreadyFreed(id) => DCBError::PageAlreadyFreed(*id),
782        DCBError::PageAlreadyDirty(id) => DCBError::PageAlreadyDirty(*id),
783        DCBError::TransportError(err) => DCBError::TransportError(err.clone()),
784        DCBError::CancelledByUser() => DCBError::CancelledByUser(),
785    }
786}
787
788// Clone implementation for EventStoreHandle
789impl Clone for RequestHandler {
790    fn clone(&self) -> Self {
791        Self {
792            mvcc: self.mvcc.clone(),
793            head_watch_tx: self.head_watch_tx.clone(),
794            writer_request_tx: self.writer_request_tx.clone(),
795        }
796    }
797}
798
799#[derive(Debug)]
800enum PreAppendDecision {
801    /// Proceed with this (possibly adjusted) condition
802    UseCondition(Option<DCBAppendCondition>),
803    /// Skip append operation because the request was idempotent; return last recorded position
804    AlreadyAppended(u64),
805}