Skip to main content

ursula_runtime/
runtime.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Instant;
5
6use bytes::Bytes;
7use tokio::sync::{Semaphore, mpsc, oneshot};
8use tokio::task::JoinSet;
9use ursula_shard::{BucketStreamId, CoreId, RaftGroupId, ShardId, ShardPlacement, StaticShardMap};
10use ursula_stream::{ColdChunkRef, ColdFlushCandidate, StreamErrorCode};
11
12use crate::cold_store::{ColdStoreHandle, new_cold_chunk_path};
13use crate::command::GroupSnapshot;
14use crate::core_worker::{CoreCommand, CoreMailbox, CoreWorker, WaitReadCancel};
15use crate::engine::{GroupEngineError, GroupEngineFactory};
16use crate::engine_in_memory::InMemoryGroupEngineFactory;
17use crate::error::{RuntimeError, map_fork_source_ref_error};
18use crate::metrics::{
19    COLD_FLUSH_GROUP_BATCH_MAX_CHUNKS, RuntimeMailboxSnapshot, RuntimeMetrics, RuntimeMetricsInner,
20    elapsed_ns, is_stale_cold_flush_candidate_error,
21};
22use crate::request::{
23    AppendBatchRequest, AppendBatchResponse, AppendExternalRequest, AppendRequest, AppendResponse,
24    BootstrapStreamRequest, BootstrapStreamResponse, CloseStreamRequest, CloseStreamResponse,
25    ColdWriteAdmission, CreateStreamExternalRequest, CreateStreamRequest, CreateStreamResponse,
26    DeleteSnapshotRequest, DeleteStreamRequest, DeleteStreamResponse, FlushColdRequest,
27    FlushColdResponse, ForkRefResponse, HeadStreamRequest, HeadStreamResponse,
28    PlanColdFlushRequest, PlanGroupColdFlushRequest, PublishSnapshotRequest,
29    PublishSnapshotResponse, ReadSnapshotRequest, ReadSnapshotResponse, ReadStreamRequest,
30    ReadStreamResponse,
31};
32
33#[derive(Debug, Clone)]
34pub struct RuntimeConfig {
35    pub core_count: usize,
36    pub raft_group_count: usize,
37    pub mailbox_capacity: usize,
38    pub threading: RuntimeThreading,
39    pub cold_max_hot_bytes_per_group: Option<u64>,
40    pub live_read_max_waiters_per_core: Option<u64>,
41}
42
43impl RuntimeConfig {
44    pub fn new(core_count: usize, raft_group_count: usize) -> Self {
45        Self {
46            core_count,
47            raft_group_count,
48            mailbox_capacity: 1024,
49            threading: RuntimeThreading::ThreadPerCore,
50            cold_max_hot_bytes_per_group: None,
51            live_read_max_waiters_per_core: Some(65_536),
52        }
53    }
54
55    pub fn with_cold_max_hot_bytes_per_group(mut self, value: Option<u64>) -> Self {
56        self.cold_max_hot_bytes_per_group = value;
57        self
58    }
59
60    pub fn with_live_read_max_waiters_per_core(mut self, value: Option<u64>) -> Self {
61        self.live_read_max_waiters_per_core = value;
62        self
63    }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum RuntimeThreading {
68    ThreadPerCore,
69    HostedTokio,
70}
71
72#[derive(Debug, Clone)]
73pub struct ShardRuntime {
74    shard_map: StaticShardMap,
75    mailboxes: Vec<CoreMailbox>,
76    metrics: Arc<RuntimeMetricsInner>,
77    next_waiter_id: Arc<AtomicU64>,
78    cold_store: Option<ColdStoreHandle>,
79}
80
81impl ShardRuntime {
82    pub fn spawn(config: RuntimeConfig) -> Result<Self, RuntimeError> {
83        Self::spawn_with_engine_factory(config, InMemoryGroupEngineFactory::default())
84    }
85
86    pub fn spawn_with_engine_factory(
87        config: RuntimeConfig,
88        engine_factory: impl GroupEngineFactory,
89    ) -> Result<Self, RuntimeError> {
90        Self::spawn_with_engine_factory_and_cold_store(config, engine_factory, None)
91    }
92
93    pub fn spawn_with_engine_factory_and_cold_store(
94        config: RuntimeConfig,
95        engine_factory: impl GroupEngineFactory,
96        cold_store: Option<ColdStoreHandle>,
97    ) -> Result<Self, RuntimeError> {
98        let shard_map = StaticShardMap::new(config.core_count, config.raft_group_count)?;
99        let metrics = Arc::new(RuntimeMetricsInner::new(
100            usize::from(shard_map.core_count()),
101            usize::try_from(shard_map.raft_group_count()).expect("u32 fits usize"),
102        ));
103        let cold_write_admission = ColdWriteAdmission {
104            max_hot_bytes_per_group: config.cold_max_hot_bytes_per_group,
105        };
106        let engine_factory: Arc<dyn GroupEngineFactory> = Arc::new(engine_factory);
107        let read_materialization = Arc::new(Semaphore::new(config.mailbox_capacity.max(1)));
108        let mut mailboxes = Vec::with_capacity(usize::from(shard_map.core_count()));
109        for raw_core_id in 0..shard_map.core_count() {
110            let core_id = CoreId(raw_core_id);
111            let (tx, rx) = mpsc::channel(config.mailbox_capacity.max(1));
112            let worker = CoreWorker {
113                core_id,
114                rx,
115                engine_factory: engine_factory.clone(),
116                groups: HashMap::new(),
117                metrics: metrics.clone(),
118                group_mailbox_capacity: config.mailbox_capacity.max(1),
119                cold_write_admission,
120                live_read_max_waiters_per_core: config.live_read_max_waiters_per_core,
121                read_materialization: read_materialization.clone(),
122            };
123            spawn_core_worker(config.threading, worker)?;
124            mailboxes.push(CoreMailbox { core_id, tx });
125        }
126        Ok(Self {
127            shard_map,
128            mailboxes,
129            metrics,
130            next_waiter_id: Arc::new(AtomicU64::new(1)),
131            cold_store,
132        })
133    }
134
135    pub fn locate(&self, stream_id: &BucketStreamId) -> ShardPlacement {
136        self.shard_map.locate(stream_id)
137    }
138
139    pub fn has_cold_store(&self) -> bool {
140        self.cold_store.is_some()
141    }
142
143    pub fn cold_store(&self) -> Option<ColdStoreHandle> {
144        self.cold_store.clone()
145    }
146
147    pub async fn create_stream(
148        &self,
149        request: CreateStreamRequest,
150    ) -> Result<CreateStreamResponse, RuntimeError> {
151        if request.forked_from.is_some() {
152            return self.create_fork_stream(request).await;
153        }
154        self.create_stream_on_owner(request).await
155    }
156
157    pub async fn create_stream_external(
158        &self,
159        request: CreateStreamExternalRequest,
160    ) -> Result<CreateStreamResponse, RuntimeError> {
161        let placement = self.shard_map.locate(&request.stream_id);
162        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
163        let (response_tx, response_rx) = oneshot::channel();
164        self.send_core_command(
165            mailbox,
166            CoreCommand::CreateExternal {
167                request,
168                placement,
169                response_tx,
170            },
171            response_rx,
172        )
173        .await
174    }
175
176    async fn create_stream_on_owner(
177        &self,
178        request: CreateStreamRequest,
179    ) -> Result<CreateStreamResponse, RuntimeError> {
180        let placement = self.shard_map.locate(&request.stream_id);
181        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
182        let (response_tx, response_rx) = oneshot::channel();
183        self.send_core_command(
184            mailbox,
185            CoreCommand::CreateStream {
186                request,
187                placement,
188                response_tx,
189            },
190            response_rx,
191        )
192        .await
193    }
194
195    async fn create_fork_stream(
196        &self,
197        mut request: CreateStreamRequest,
198    ) -> Result<CreateStreamResponse, RuntimeError> {
199        let source_id = request
200            .forked_from
201            .clone()
202            .expect("forked_from checked before create_fork_stream");
203        let now_ms = request.now_ms;
204        let source_placement = self.shard_map.locate(&source_id);
205        let source_head = self
206            .head_stream(HeadStreamRequest {
207                stream_id: source_id.clone(),
208                now_ms,
209            })
210            .await
211            .map_err(|err| map_fork_source_ref_error(err, source_placement))?;
212
213        if request.content_type_explicit {
214            if request.content_type != source_head.content_type {
215                return Err(RuntimeError::group_engine(
216                    source_placement,
217                    GroupEngineError::stream(
218                        StreamErrorCode::ContentTypeMismatch,
219                        format!(
220                            "fork content type '{}' does not match source content type '{}'",
221                            request.content_type, source_head.content_type
222                        ),
223                    ),
224                ));
225            }
226        } else {
227            request.content_type.clone_from(&source_head.content_type);
228        }
229
230        let fork_offset = request.fork_offset.unwrap_or(source_head.tail_offset);
231        if fork_offset > source_head.tail_offset {
232            return Err(RuntimeError::group_engine(
233                source_placement,
234                GroupEngineError::stream(
235                    StreamErrorCode::InvalidFork,
236                    format!(
237                        "fork offset {fork_offset} is beyond source stream '{}' tail {}",
238                        source_id, source_head.tail_offset
239                    ),
240                ),
241            ));
242        }
243
244        let max_len = usize::try_from(fork_offset).map_err(|_| {
245            RuntimeError::group_engine(
246                source_placement,
247                GroupEngineError::stream(
248                    StreamErrorCode::InvalidFork,
249                    format!("fork offset {fork_offset} cannot fit in memory on this host"),
250                ),
251            )
252        })?;
253        request.initial_payload = if fork_offset == 0 {
254            Bytes::new()
255        } else {
256            self.read_stream(ReadStreamRequest {
257                stream_id: source_id.clone(),
258                offset: 0,
259                max_len,
260                now_ms,
261            })
262            .await?
263            .payload
264            .into()
265        };
266        self.add_fork_ref_on_owner(source_id.clone(), now_ms)
267            .await
268            .map_err(|err| map_fork_source_ref_error(err, source_placement))?;
269        request.close_after = false;
270        request.stream_seq = None;
271        request.producer = None;
272        if request.stream_ttl_seconds.is_none() && request.stream_expires_at_ms.is_none() {
273            request.stream_ttl_seconds = source_head.stream_ttl_seconds;
274            request.stream_expires_at_ms = source_head.stream_expires_at_ms;
275        }
276        request.fork_offset = Some(fork_offset);
277        match self.create_stream_on_owner(request).await {
278            Ok(response) if response.already_exists => {
279                self.release_fork_ref_cascade(source_id).await?;
280                Ok(response)
281            }
282            Ok(response) => Ok(response),
283            Err(err) => {
284                let _ = self.release_fork_ref_cascade(source_id).await;
285                Err(err)
286            }
287        }
288    }
289
290    pub async fn head_stream(
291        &self,
292        request: HeadStreamRequest,
293    ) -> Result<HeadStreamResponse, RuntimeError> {
294        let placement = self.shard_map.locate(&request.stream_id);
295        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
296        let (response_tx, response_rx) = oneshot::channel();
297        self.send_core_command(
298            mailbox,
299            CoreCommand::HeadStream {
300                request,
301                placement,
302                response_tx,
303            },
304            response_rx,
305        )
306        .await
307    }
308
309    pub async fn read_stream(
310        &self,
311        request: ReadStreamRequest,
312    ) -> Result<ReadStreamResponse, RuntimeError> {
313        let placement = self.shard_map.locate(&request.stream_id);
314        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
315        let (response_tx, response_rx) = oneshot::channel();
316        self.send_core_command(
317            mailbox,
318            CoreCommand::ReadStream {
319                request,
320                placement,
321                response_tx,
322            },
323            response_rx,
324        )
325        .await
326    }
327
328    pub async fn publish_snapshot(
329        &self,
330        request: PublishSnapshotRequest,
331    ) -> Result<PublishSnapshotResponse, RuntimeError> {
332        let placement = self.shard_map.locate(&request.stream_id);
333        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
334        let (response_tx, response_rx) = oneshot::channel();
335        self.send_core_command(
336            mailbox,
337            CoreCommand::PublishSnapshot {
338                request,
339                placement,
340                response_tx,
341            },
342            response_rx,
343        )
344        .await
345    }
346
347    pub async fn read_snapshot(
348        &self,
349        request: ReadSnapshotRequest,
350    ) -> Result<ReadSnapshotResponse, RuntimeError> {
351        let placement = self.shard_map.locate(&request.stream_id);
352        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
353        let (response_tx, response_rx) = oneshot::channel();
354        self.send_core_command(
355            mailbox,
356            CoreCommand::ReadSnapshot {
357                request,
358                placement,
359                response_tx,
360            },
361            response_rx,
362        )
363        .await
364    }
365
366    pub async fn delete_snapshot(
367        &self,
368        request: DeleteSnapshotRequest,
369    ) -> Result<(), RuntimeError> {
370        let placement = self.shard_map.locate(&request.stream_id);
371        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
372        let (response_tx, response_rx) = oneshot::channel();
373        self.send_core_command(
374            mailbox,
375            CoreCommand::DeleteSnapshot {
376                request,
377                placement,
378                response_tx,
379            },
380            response_rx,
381        )
382        .await
383    }
384
385    pub async fn bootstrap_stream(
386        &self,
387        request: BootstrapStreamRequest,
388    ) -> Result<BootstrapStreamResponse, RuntimeError> {
389        let placement = self.shard_map.locate(&request.stream_id);
390        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
391        let (response_tx, response_rx) = oneshot::channel();
392        self.send_core_command(
393            mailbox,
394            CoreCommand::BootstrapStream {
395                request,
396                placement,
397                response_tx,
398            },
399            response_rx,
400        )
401        .await
402    }
403
404    pub async fn wait_read_stream(
405        &self,
406        request: ReadStreamRequest,
407    ) -> Result<ReadStreamResponse, RuntimeError> {
408        let placement = self.shard_map.locate(&request.stream_id);
409        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
410        let waiter_id = self.next_waiter_id.fetch_add(1, Ordering::Relaxed);
411        let stream_id = request.stream_id.clone();
412        let (response_tx, response_rx) = oneshot::channel();
413        self.enqueue_core_command(
414            mailbox,
415            CoreCommand::WaitRead {
416                request,
417                placement,
418                waiter_id,
419                response_tx,
420            },
421        )
422        .await?;
423        let mut cancel = WaitReadCancel::new(mailbox.tx.clone(), stream_id, placement, waiter_id);
424        let response = response_rx
425            .await
426            .map_err(|_| RuntimeError::ResponseDropped {
427                core_id: mailbox.core_id,
428            })?;
429        cancel.disarm();
430        response
431    }
432
433    pub async fn require_local_live_read_owner(
434        &self,
435        stream_id: &BucketStreamId,
436    ) -> Result<(), RuntimeError> {
437        let placement = self.shard_map.locate(stream_id);
438        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
439        let (response_tx, response_rx) = oneshot::channel();
440        self.send_core_command(
441            mailbox,
442            CoreCommand::RequireLiveReadOwner {
443                placement,
444                response_tx,
445            },
446            response_rx,
447        )
448        .await
449    }
450
451    pub async fn close_stream(
452        &self,
453        request: CloseStreamRequest,
454    ) -> Result<CloseStreamResponse, RuntimeError> {
455        let placement = self.shard_map.locate(&request.stream_id);
456        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
457        let (response_tx, response_rx) = oneshot::channel();
458        self.send_core_command(
459            mailbox,
460            CoreCommand::CloseStream {
461                request,
462                placement,
463                response_tx,
464            },
465            response_rx,
466        )
467        .await
468    }
469
470    pub async fn delete_stream(
471        &self,
472        request: DeleteStreamRequest,
473    ) -> Result<DeleteStreamResponse, RuntimeError> {
474        let response = self.delete_stream_on_owner(request).await?;
475        if let Some(parent_to_release) = response.parent_to_release.clone() {
476            self.release_fork_ref_cascade(parent_to_release).await?;
477        }
478        Ok(response)
479    }
480
481    async fn delete_stream_on_owner(
482        &self,
483        request: DeleteStreamRequest,
484    ) -> Result<DeleteStreamResponse, RuntimeError> {
485        let placement = self.shard_map.locate(&request.stream_id);
486        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
487        let (response_tx, response_rx) = oneshot::channel();
488        self.send_core_command(
489            mailbox,
490            CoreCommand::DeleteStream {
491                request,
492                placement,
493                response_tx,
494            },
495            response_rx,
496        )
497        .await
498    }
499
500    async fn add_fork_ref_on_owner(
501        &self,
502        stream_id: BucketStreamId,
503        now_ms: u64,
504    ) -> Result<ForkRefResponse, RuntimeError> {
505        let placement = self.shard_map.locate(&stream_id);
506        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
507        let (response_tx, response_rx) = oneshot::channel();
508        self.send_core_command(
509            mailbox,
510            CoreCommand::AddForkRef {
511                stream_id,
512                now_ms,
513                placement,
514                response_tx,
515            },
516            response_rx,
517        )
518        .await
519    }
520
521    async fn release_fork_ref_on_owner(
522        &self,
523        stream_id: BucketStreamId,
524    ) -> Result<ForkRefResponse, RuntimeError> {
525        let placement = self.shard_map.locate(&stream_id);
526        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
527        let (response_tx, response_rx) = oneshot::channel();
528        self.send_core_command(
529            mailbox,
530            CoreCommand::ReleaseForkRef {
531                stream_id,
532                placement,
533                response_tx,
534            },
535            response_rx,
536        )
537        .await
538    }
539
540    async fn release_fork_ref_cascade(
541        &self,
542        stream_id: BucketStreamId,
543    ) -> Result<(), RuntimeError> {
544        let mut next = Some(stream_id);
545        while let Some(current) = next {
546            let response = self.release_fork_ref_on_owner(current).await?;
547            next = response.parent_to_release;
548        }
549        Ok(())
550    }
551
552    pub async fn flush_cold(
553        &self,
554        request: FlushColdRequest,
555    ) -> Result<FlushColdResponse, RuntimeError> {
556        let placement = self.shard_map.locate(&request.stream_id);
557        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
558        let (response_tx, response_rx) = oneshot::channel();
559        self.send_core_command(
560            mailbox,
561            CoreCommand::FlushCold {
562                request,
563                placement,
564                response_tx,
565            },
566            response_rx,
567        )
568        .await
569    }
570
571    pub async fn append_external(
572        &self,
573        request: AppendExternalRequest,
574    ) -> Result<AppendResponse, RuntimeError> {
575        let placement = self.shard_map.locate(&request.stream_id);
576        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
577        let (response_tx, response_rx) = oneshot::channel();
578        self.send_core_command(
579            mailbox,
580            CoreCommand::AppendExternal {
581                request,
582                placement,
583                response_tx,
584            },
585            response_rx,
586        )
587        .await
588    }
589
590    pub async fn plan_cold_flush(
591        &self,
592        request: PlanColdFlushRequest,
593    ) -> Result<Option<ColdFlushCandidate>, RuntimeError> {
594        let placement = self.shard_map.locate(&request.stream_id);
595        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
596        let (response_tx, response_rx) = oneshot::channel();
597        self.send_core_command(
598            mailbox,
599            CoreCommand::PlanColdFlush {
600                request,
601                placement,
602                response_tx,
603            },
604            response_rx,
605        )
606        .await
607    }
608
609    pub async fn flush_cold_once(
610        &self,
611        request: PlanColdFlushRequest,
612    ) -> Result<Option<FlushColdResponse>, RuntimeError> {
613        let Some(candidate) = self.plan_cold_flush(request).await? else {
614            return Ok(None);
615        };
616        self.flush_cold_candidate(candidate).await.map(Some)
617    }
618
619    pub async fn plan_next_cold_flush(
620        &self,
621        raft_group_id: RaftGroupId,
622        request: PlanGroupColdFlushRequest,
623    ) -> Result<Option<ColdFlushCandidate>, RuntimeError> {
624        let placement = self.placement_for_group(raft_group_id)?;
625        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
626        let (response_tx, response_rx) = oneshot::channel();
627        self.send_core_command(
628            mailbox,
629            CoreCommand::PlanNextColdFlush {
630                request,
631                placement,
632                response_tx,
633            },
634            response_rx,
635        )
636        .await
637    }
638
639    pub async fn plan_next_cold_flush_batch(
640        &self,
641        raft_group_id: RaftGroupId,
642        request: PlanGroupColdFlushRequest,
643        max_candidates: usize,
644    ) -> Result<Vec<ColdFlushCandidate>, RuntimeError> {
645        let placement = self.placement_for_group(raft_group_id)?;
646        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
647        let (response_tx, response_rx) = oneshot::channel();
648        self.send_core_command(
649            mailbox,
650            CoreCommand::PlanNextColdFlushBatch {
651                request,
652                placement,
653                max_candidates,
654                response_tx,
655            },
656            response_rx,
657        )
658        .await
659    }
660
661    pub async fn flush_cold_group_once(
662        &self,
663        raft_group_id: RaftGroupId,
664        request: PlanGroupColdFlushRequest,
665    ) -> Result<Option<FlushColdResponse>, RuntimeError> {
666        let Some(candidate) = self.plan_next_cold_flush(raft_group_id, request).await? else {
667            return Ok(None);
668        };
669        match self.flush_cold_candidate(candidate).await {
670            Ok(response) => Ok(Some(response)),
671            Err(err) if is_stale_cold_flush_candidate_error(&err) => Ok(None),
672            Err(err) => Err(err),
673        }
674    }
675
676    pub async fn flush_cold_group_batch_once(
677        &self,
678        raft_group_id: RaftGroupId,
679        request: PlanGroupColdFlushRequest,
680        max_candidates: usize,
681    ) -> Result<Vec<FlushColdResponse>, RuntimeError> {
682        let candidates = self
683            .plan_next_cold_flush_batch(raft_group_id, request, max_candidates)
684            .await?;
685        if candidates.is_empty() {
686            return Ok(Vec::new());
687        }
688        match self.flush_cold_candidates_batch(candidates).await {
689            Ok(responses) => Ok(responses),
690            Err(err) if is_stale_cold_flush_candidate_error(&err) => Ok(Vec::new()),
691            Err(err) => Err(err),
692        }
693    }
694
695    async fn flush_cold_candidate(
696        &self,
697        candidate: ColdFlushCandidate,
698    ) -> Result<FlushColdResponse, RuntimeError> {
699        let Some(cold_store) = self.cold_store.as_ref() else {
700            return Err(RuntimeError::ColdStoreConfig {
701                message: "URSULA_COLD_BACKEND must be configured before flushing cold chunks"
702                    .to_owned(),
703            });
704        };
705        let path = new_cold_chunk_path(
706            &candidate.stream_id,
707            candidate.start_offset,
708            candidate.end_offset,
709        );
710        let upload_started_at = Instant::now();
711        let object_size = cold_store
712            .write_chunk(&path, &candidate.payload)
713            .await
714            .map_err(|err| RuntimeError::ColdStoreIo {
715                message: err.to_string(),
716            })?;
717        self.metrics
718            .record_cold_upload(object_size, elapsed_ns(upload_started_at));
719        let publish_started_at = Instant::now();
720        let publish = self
721            .flush_cold(FlushColdRequest {
722                stream_id: candidate.stream_id,
723                chunk: ColdChunkRef {
724                    start_offset: candidate.start_offset,
725                    end_offset: candidate.end_offset,
726                    s3_path: path.clone(),
727                    object_size,
728                },
729            })
730            .await;
731        match publish {
732            Ok(response) => {
733                self.metrics
734                    .record_cold_publish(object_size, elapsed_ns(publish_started_at));
735                Ok(response)
736            }
737            Err(err) => {
738                let cleanup_failed = cold_store.delete_chunk(&path).await.is_err();
739                self.metrics
740                    .record_cold_orphan_cleanup(object_size, cleanup_failed);
741                Err(err)
742            }
743        }
744    }
745
746    pub(crate) async fn flush_cold_candidates_batch(
747        &self,
748        candidates: Vec<ColdFlushCandidate>,
749    ) -> Result<Vec<FlushColdResponse>, RuntimeError> {
750        let Some(cold_store) = self.cold_store.as_ref() else {
751            return Err(RuntimeError::ColdStoreConfig {
752                message: "URSULA_COLD_BACKEND must be configured before flushing cold chunks"
753                    .to_owned(),
754            });
755        };
756        let mut requests = Vec::with_capacity(candidates.len());
757        let mut uploaded = Vec::with_capacity(candidates.len());
758        for candidate in candidates {
759            let path = new_cold_chunk_path(
760                &candidate.stream_id,
761                candidate.start_offset,
762                candidate.end_offset,
763            );
764            let upload_started_at = Instant::now();
765            let object_size = cold_store
766                .write_chunk(&path, &candidate.payload)
767                .await
768                .map_err(|err| RuntimeError::ColdStoreIo {
769                    message: err.to_string(),
770                })?;
771            self.metrics
772                .record_cold_upload(object_size, elapsed_ns(upload_started_at));
773            uploaded.push((path.clone(), object_size));
774            requests.push(FlushColdRequest {
775                stream_id: candidate.stream_id,
776                chunk: ColdChunkRef {
777                    start_offset: candidate.start_offset,
778                    end_offset: candidate.end_offset,
779                    s3_path: path,
780                    object_size,
781                },
782            });
783        }
784
785        let placement = self.shard_map.locate(&requests[0].stream_id);
786        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
787        let (response_tx, response_rx) = oneshot::channel();
788        let publish_started_at = Instant::now();
789        let publish = self
790            .send_core_command(
791                mailbox,
792                CoreCommand::FlushColdBatch {
793                    requests,
794                    placement,
795                    response_tx,
796                },
797                response_rx,
798            )
799            .await;
800        match publish {
801            Ok(responses) => {
802                let publish_ns = elapsed_ns(publish_started_at);
803                let per_chunk_publish_ns =
804                    publish_ns / u64::try_from(uploaded.len()).expect("uploaded len fits u64");
805                for (_, object_size) in &uploaded {
806                    self.metrics
807                        .record_cold_publish(*object_size, per_chunk_publish_ns);
808                }
809                Ok(responses)
810            }
811            Err(err) => {
812                for (path, object_size) in uploaded {
813                    let cleanup_failed = cold_store.delete_chunk(&path).await.is_err();
814                    self.metrics
815                        .record_cold_orphan_cleanup(object_size, cleanup_failed);
816                }
817                Err(err)
818            }
819        }
820    }
821
822    pub async fn flush_cold_all_groups_once(
823        &self,
824        request: PlanGroupColdFlushRequest,
825    ) -> Result<usize, RuntimeError> {
826        self.flush_cold_all_groups_once_bounded(request, 1).await
827    }
828
829    pub async fn flush_cold_all_groups_once_bounded(
830        &self,
831        request: PlanGroupColdFlushRequest,
832        max_concurrency: usize,
833    ) -> Result<usize, RuntimeError> {
834        let max_concurrency = max_concurrency.max(1);
835        if max_concurrency == 1 {
836            return self.flush_cold_all_groups_once_serial(request).await;
837        }
838        let mut flushed = 0;
839        let mut next_group_id = 0;
840        let group_count = self.shard_map.raft_group_count();
841        let mut tasks = JoinSet::new();
842
843        while next_group_id < group_count || !tasks.is_empty() {
844            while next_group_id < group_count && tasks.len() < max_concurrency {
845                let runtime = self.clone();
846                let request = request.clone();
847                let group_id = RaftGroupId(next_group_id);
848                next_group_id += 1;
849                tasks.spawn(async move {
850                    runtime
851                        .flush_cold_group_batch_once(
852                            group_id,
853                            request,
854                            COLD_FLUSH_GROUP_BATCH_MAX_CHUNKS,
855                        )
856                        .await
857                        .map(|responses| responses.len())
858                });
859            }
860            if let Some(result) = tasks.join_next().await {
861                match result {
862                    Ok(Ok(count)) => flushed += count,
863                    Ok(Err(err)) => return Err(err),
864                    Err(err) => {
865                        return Err(RuntimeError::ColdStoreIo {
866                            message: format!("cold flush task failed: {err}"),
867                        });
868                    }
869                }
870            }
871        }
872        Ok(flushed)
873    }
874
875    async fn flush_cold_all_groups_once_serial(
876        &self,
877        request: PlanGroupColdFlushRequest,
878    ) -> Result<usize, RuntimeError> {
879        let mut flushed = 0;
880        for group_id in 0..self.shard_map.raft_group_count() {
881            flushed += self
882                .flush_cold_group_batch_once(
883                    RaftGroupId(group_id),
884                    request.clone(),
885                    COLD_FLUSH_GROUP_BATCH_MAX_CHUNKS,
886                )
887                .await?
888                .len();
889        }
890        Ok(flushed)
891    }
892
893    pub async fn append(&self, request: AppendRequest) -> Result<AppendResponse, RuntimeError> {
894        if request.payload.is_empty() {
895            return Err(RuntimeError::EmptyAppend);
896        }
897        let placement = self.shard_map.locate(&request.stream_id);
898        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
899        let (response_tx, response_rx) = oneshot::channel();
900        self.send_core_command(
901            mailbox,
902            CoreCommand::Append {
903                request,
904                placement,
905                response_tx,
906            },
907            response_rx,
908        )
909        .await
910    }
911
912    pub async fn append_batch(
913        &self,
914        request: AppendBatchRequest,
915    ) -> Result<AppendBatchResponse, RuntimeError> {
916        if request.payloads.is_empty() {
917            return Err(RuntimeError::EmptyAppend);
918        }
919        let placement = self.shard_map.locate(&request.stream_id);
920        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
921        let (response_tx, response_rx) = oneshot::channel();
922        self.send_core_command(
923            mailbox,
924            CoreCommand::AppendBatch {
925                request,
926                placement,
927                response_tx,
928            },
929            response_rx,
930        )
931        .await
932    }
933
934    pub async fn snapshot_group(
935        &self,
936        raft_group_id: RaftGroupId,
937    ) -> Result<GroupSnapshot, RuntimeError> {
938        let placement = self.placement_for_group(raft_group_id)?;
939        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
940        let (response_tx, response_rx) = oneshot::channel();
941        self.send_core_command(
942            mailbox,
943            CoreCommand::SnapshotGroup {
944                placement,
945                response_tx,
946            },
947            response_rx,
948        )
949        .await
950    }
951
952    pub async fn install_group_snapshot(
953        &self,
954        snapshot: GroupSnapshot,
955    ) -> Result<(), RuntimeError> {
956        let expected = self.placement_for_group(snapshot.placement.raft_group_id)?;
957        if snapshot.placement != expected {
958            return Err(RuntimeError::SnapshotPlacementMismatch {
959                expected,
960                actual: snapshot.placement,
961            });
962        }
963        let mailbox = &self.mailboxes[usize::from(expected.core_id.0)];
964        let (response_tx, response_rx) = oneshot::channel();
965        self.send_core_command(
966            mailbox,
967            CoreCommand::InstallGroupSnapshot {
968                snapshot,
969                response_tx,
970            },
971            response_rx,
972        )
973        .await
974    }
975
976    pub async fn warm_group(
977        &self,
978        raft_group_id: RaftGroupId,
979    ) -> Result<ShardPlacement, RuntimeError> {
980        let placement = self.placement_for_group(raft_group_id)?;
981        let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
982        let (response_tx, response_rx) = oneshot::channel();
983        self.send_core_command(
984            mailbox,
985            CoreCommand::WarmGroup {
986                placement,
987                response_tx,
988            },
989            response_rx,
990        )
991        .await
992    }
993
994    pub async fn warm_all_groups(&self) -> Result<(), RuntimeError> {
995        for raw_group_id in 0..self.shard_map.raft_group_count() {
996            self.warm_group(RaftGroupId(raw_group_id)).await?;
997        }
998        Ok(())
999    }
1000
1001    fn placement_for_group(
1002        &self,
1003        raft_group_id: RaftGroupId,
1004    ) -> Result<ShardPlacement, RuntimeError> {
1005        if raft_group_id.0 >= self.shard_map.raft_group_count() {
1006            return Err(RuntimeError::InvalidRaftGroup {
1007                raft_group_id,
1008                raft_group_count: self.shard_map.raft_group_count(),
1009            });
1010        }
1011        Ok(ShardPlacement {
1012            core_id: CoreId(
1013                (raft_group_id.0 % u32::from(self.shard_map.core_count()))
1014                    .try_into()
1015                    .expect("core id fits u16"),
1016            ),
1017            shard_id: ShardId(raft_group_id.0),
1018            raft_group_id,
1019        })
1020    }
1021
1022    async fn send_core_command<T>(
1023        &self,
1024        mailbox: &CoreMailbox,
1025        command: CoreCommand,
1026        response_rx: oneshot::Receiver<Result<T, RuntimeError>>,
1027    ) -> Result<T, RuntimeError> {
1028        self.enqueue_core_command(mailbox, command).await?;
1029        response_rx
1030            .await
1031            .map_err(|_| RuntimeError::ResponseDropped {
1032                core_id: mailbox.core_id,
1033            })?
1034    }
1035
1036    async fn enqueue_core_command(
1037        &self,
1038        mailbox: &CoreMailbox,
1039        command: CoreCommand,
1040    ) -> Result<(), RuntimeError> {
1041        if mailbox.tx.capacity() == 0 {
1042            self.metrics.record_mailbox_full(mailbox.core_id);
1043        }
1044        let started_at = Instant::now();
1045        mailbox
1046            .tx
1047            .send(command)
1048            .await
1049            .map_err(|_| RuntimeError::MailboxClosed {
1050                core_id: mailbox.core_id,
1051            })?;
1052        self.metrics
1053            .record_routed_request(mailbox.core_id, elapsed_ns(started_at));
1054        Ok(())
1055    }
1056
1057    pub fn metrics(&self) -> RuntimeMetrics {
1058        RuntimeMetrics {
1059            inner: self.metrics.clone(),
1060        }
1061    }
1062
1063    pub fn mailbox_snapshot(&self) -> RuntimeMailboxSnapshot {
1064        let depths = self
1065            .mailboxes
1066            .iter()
1067            .map(CoreMailbox::depth)
1068            .collect::<Vec<_>>();
1069        let capacities = self
1070            .mailboxes
1071            .iter()
1072            .map(CoreMailbox::capacity)
1073            .collect::<Vec<_>>();
1074        RuntimeMailboxSnapshot { depths, capacities }
1075    }
1076}
1077
1078fn spawn_core_worker(threading: RuntimeThreading, worker: CoreWorker) -> Result<(), RuntimeError> {
1079    let core_id = worker.core_id;
1080    match threading {
1081        RuntimeThreading::HostedTokio => {
1082            tokio::spawn(worker.run());
1083            Ok(())
1084        }
1085        RuntimeThreading::ThreadPerCore => std::thread::Builder::new()
1086            .name(format!("ursula-core-{}", core_id.0))
1087            .spawn(move || {
1088                let runtime = tokio::runtime::Builder::new_current_thread()
1089                    .enable_all()
1090                    .build()
1091                    .expect("build per-core tokio runtime");
1092                runtime.block_on(worker.run());
1093            })
1094            .map(|_| ())
1095            .map_err(|err| RuntimeError::SpawnCoreThread {
1096                core_id,
1097                message: err.to_string(),
1098            }),
1099    }
1100}