1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Instant;
5
6use bytes::Bytes;
7use tokio::sync::{Semaphore, mpsc, oneshot};
8use tokio::task::JoinSet;
9use ursula_shard::{BucketStreamId, CoreId, RaftGroupId, ShardId, ShardPlacement, StaticShardMap};
10use ursula_stream::{ColdChunkRef, ColdFlushCandidate, StreamErrorCode};
11
12use crate::cold_store::{ColdStoreHandle, new_cold_chunk_path};
13use crate::command::GroupSnapshot;
14use crate::core_worker::{CoreCommand, CoreMailbox, CoreWorker, WaitReadCancel};
15use crate::engine::{GroupEngineError, GroupEngineFactory};
16use crate::engine_in_memory::InMemoryGroupEngineFactory;
17use crate::error::{RuntimeError, map_fork_source_ref_error};
18use crate::metrics::{
19 COLD_FLUSH_GROUP_BATCH_MAX_CHUNKS, RuntimeMailboxSnapshot, RuntimeMetrics, RuntimeMetricsInner,
20 elapsed_ns, is_stale_cold_flush_candidate_error,
21};
22use crate::request::{
23 AppendBatchRequest, AppendBatchResponse, AppendExternalRequest, AppendRequest, AppendResponse,
24 BootstrapStreamRequest, BootstrapStreamResponse, CloseStreamRequest, CloseStreamResponse,
25 ColdWriteAdmission, CreateStreamExternalRequest, CreateStreamRequest, CreateStreamResponse,
26 DeleteSnapshotRequest, DeleteStreamRequest, DeleteStreamResponse, FlushColdRequest,
27 FlushColdResponse, ForkRefResponse, HeadStreamRequest, HeadStreamResponse,
28 PlanColdFlushRequest, PlanGroupColdFlushRequest, PublishSnapshotRequest,
29 PublishSnapshotResponse, ReadSnapshotRequest, ReadSnapshotResponse, ReadStreamRequest,
30 ReadStreamResponse,
31};
32
33#[derive(Debug, Clone)]
34pub struct RuntimeConfig {
35 pub core_count: usize,
36 pub raft_group_count: usize,
37 pub mailbox_capacity: usize,
38 pub threading: RuntimeThreading,
39 pub cold_max_hot_bytes_per_group: Option<u64>,
40 pub live_read_max_waiters_per_core: Option<u64>,
41}
42
43impl RuntimeConfig {
44 pub fn new(core_count: usize, raft_group_count: usize) -> Self {
45 Self {
46 core_count,
47 raft_group_count,
48 mailbox_capacity: 1024,
49 threading: RuntimeThreading::ThreadPerCore,
50 cold_max_hot_bytes_per_group: None,
51 live_read_max_waiters_per_core: Some(65_536),
52 }
53 }
54
55 pub fn with_cold_max_hot_bytes_per_group(mut self, value: Option<u64>) -> Self {
56 self.cold_max_hot_bytes_per_group = value;
57 self
58 }
59
60 pub fn with_live_read_max_waiters_per_core(mut self, value: Option<u64>) -> Self {
61 self.live_read_max_waiters_per_core = value;
62 self
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum RuntimeThreading {
68 ThreadPerCore,
69 HostedTokio,
70}
71
72#[derive(Debug, Clone)]
73pub struct ShardRuntime {
74 shard_map: StaticShardMap,
75 mailboxes: Vec<CoreMailbox>,
76 metrics: Arc<RuntimeMetricsInner>,
77 next_waiter_id: Arc<AtomicU64>,
78 cold_store: Option<ColdStoreHandle>,
79}
80
81impl ShardRuntime {
82 pub fn spawn(config: RuntimeConfig) -> Result<Self, RuntimeError> {
83 Self::spawn_with_engine_factory(config, InMemoryGroupEngineFactory::default())
84 }
85
86 pub fn spawn_with_engine_factory(
87 config: RuntimeConfig,
88 engine_factory: impl GroupEngineFactory,
89 ) -> Result<Self, RuntimeError> {
90 Self::spawn_with_engine_factory_and_cold_store(config, engine_factory, None)
91 }
92
93 pub fn spawn_with_engine_factory_and_cold_store(
94 config: RuntimeConfig,
95 engine_factory: impl GroupEngineFactory,
96 cold_store: Option<ColdStoreHandle>,
97 ) -> Result<Self, RuntimeError> {
98 let shard_map = StaticShardMap::new(config.core_count, config.raft_group_count)?;
99 let metrics = Arc::new(RuntimeMetricsInner::new(
100 usize::from(shard_map.core_count()),
101 usize::try_from(shard_map.raft_group_count()).expect("u32 fits usize"),
102 ));
103 let cold_write_admission = ColdWriteAdmission {
104 max_hot_bytes_per_group: config.cold_max_hot_bytes_per_group,
105 };
106 let engine_factory: Arc<dyn GroupEngineFactory> = Arc::new(engine_factory);
107 let read_materialization = Arc::new(Semaphore::new(config.mailbox_capacity.max(1)));
108 let mut mailboxes = Vec::with_capacity(usize::from(shard_map.core_count()));
109 for raw_core_id in 0..shard_map.core_count() {
110 let core_id = CoreId(raw_core_id);
111 let (tx, rx) = mpsc::channel(config.mailbox_capacity.max(1));
112 let worker = CoreWorker {
113 core_id,
114 rx,
115 engine_factory: engine_factory.clone(),
116 groups: HashMap::new(),
117 metrics: metrics.clone(),
118 group_mailbox_capacity: config.mailbox_capacity.max(1),
119 cold_write_admission,
120 live_read_max_waiters_per_core: config.live_read_max_waiters_per_core,
121 read_materialization: read_materialization.clone(),
122 };
123 spawn_core_worker(config.threading, worker)?;
124 mailboxes.push(CoreMailbox { core_id, tx });
125 }
126 Ok(Self {
127 shard_map,
128 mailboxes,
129 metrics,
130 next_waiter_id: Arc::new(AtomicU64::new(1)),
131 cold_store,
132 })
133 }
134
135 pub fn locate(&self, stream_id: &BucketStreamId) -> ShardPlacement {
136 self.shard_map.locate(stream_id)
137 }
138
139 pub fn has_cold_store(&self) -> bool {
140 self.cold_store.is_some()
141 }
142
143 pub fn cold_store(&self) -> Option<ColdStoreHandle> {
144 self.cold_store.clone()
145 }
146
147 pub async fn create_stream(
148 &self,
149 request: CreateStreamRequest,
150 ) -> Result<CreateStreamResponse, RuntimeError> {
151 if request.forked_from.is_some() {
152 return self.create_fork_stream(request).await;
153 }
154 self.create_stream_on_owner(request).await
155 }
156
157 pub async fn create_stream_external(
158 &self,
159 request: CreateStreamExternalRequest,
160 ) -> Result<CreateStreamResponse, RuntimeError> {
161 let placement = self.shard_map.locate(&request.stream_id);
162 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
163 let (response_tx, response_rx) = oneshot::channel();
164 self.send_core_command(
165 mailbox,
166 CoreCommand::CreateExternal {
167 request,
168 placement,
169 response_tx,
170 },
171 response_rx,
172 )
173 .await
174 }
175
176 async fn create_stream_on_owner(
177 &self,
178 request: CreateStreamRequest,
179 ) -> Result<CreateStreamResponse, RuntimeError> {
180 let placement = self.shard_map.locate(&request.stream_id);
181 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
182 let (response_tx, response_rx) = oneshot::channel();
183 self.send_core_command(
184 mailbox,
185 CoreCommand::CreateStream {
186 request,
187 placement,
188 response_tx,
189 },
190 response_rx,
191 )
192 .await
193 }
194
195 async fn create_fork_stream(
196 &self,
197 mut request: CreateStreamRequest,
198 ) -> Result<CreateStreamResponse, RuntimeError> {
199 let source_id = request
200 .forked_from
201 .clone()
202 .expect("forked_from checked before create_fork_stream");
203 let now_ms = request.now_ms;
204 let source_placement = self.shard_map.locate(&source_id);
205 let source_head = self
206 .head_stream(HeadStreamRequest {
207 stream_id: source_id.clone(),
208 now_ms,
209 })
210 .await
211 .map_err(|err| map_fork_source_ref_error(err, source_placement))?;
212
213 if request.content_type_explicit {
214 if request.content_type != source_head.content_type {
215 return Err(RuntimeError::group_engine(
216 source_placement,
217 GroupEngineError::stream(
218 StreamErrorCode::ContentTypeMismatch,
219 format!(
220 "fork content type '{}' does not match source content type '{}'",
221 request.content_type, source_head.content_type
222 ),
223 ),
224 ));
225 }
226 } else {
227 request.content_type.clone_from(&source_head.content_type);
228 }
229
230 let fork_offset = request.fork_offset.unwrap_or(source_head.tail_offset);
231 if fork_offset > source_head.tail_offset {
232 return Err(RuntimeError::group_engine(
233 source_placement,
234 GroupEngineError::stream(
235 StreamErrorCode::InvalidFork,
236 format!(
237 "fork offset {fork_offset} is beyond source stream '{}' tail {}",
238 source_id, source_head.tail_offset
239 ),
240 ),
241 ));
242 }
243
244 let max_len = usize::try_from(fork_offset).map_err(|_| {
245 RuntimeError::group_engine(
246 source_placement,
247 GroupEngineError::stream(
248 StreamErrorCode::InvalidFork,
249 format!("fork offset {fork_offset} cannot fit in memory on this host"),
250 ),
251 )
252 })?;
253 request.initial_payload = if fork_offset == 0 {
254 Bytes::new()
255 } else {
256 self.read_stream(ReadStreamRequest {
257 stream_id: source_id.clone(),
258 offset: 0,
259 max_len,
260 now_ms,
261 })
262 .await?
263 .payload
264 .into()
265 };
266 self.add_fork_ref_on_owner(source_id.clone(), now_ms)
267 .await
268 .map_err(|err| map_fork_source_ref_error(err, source_placement))?;
269 request.close_after = false;
270 request.stream_seq = None;
271 request.producer = None;
272 if request.stream_ttl_seconds.is_none() && request.stream_expires_at_ms.is_none() {
273 request.stream_ttl_seconds = source_head.stream_ttl_seconds;
274 request.stream_expires_at_ms = source_head.stream_expires_at_ms;
275 }
276 request.fork_offset = Some(fork_offset);
277 match self.create_stream_on_owner(request).await {
278 Ok(response) if response.already_exists => {
279 self.release_fork_ref_cascade(source_id).await?;
280 Ok(response)
281 }
282 Ok(response) => Ok(response),
283 Err(err) => {
284 let _ = self.release_fork_ref_cascade(source_id).await;
285 Err(err)
286 }
287 }
288 }
289
290 pub async fn head_stream(
291 &self,
292 request: HeadStreamRequest,
293 ) -> Result<HeadStreamResponse, RuntimeError> {
294 let placement = self.shard_map.locate(&request.stream_id);
295 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
296 let (response_tx, response_rx) = oneshot::channel();
297 self.send_core_command(
298 mailbox,
299 CoreCommand::HeadStream {
300 request,
301 placement,
302 response_tx,
303 },
304 response_rx,
305 )
306 .await
307 }
308
309 pub async fn read_stream(
310 &self,
311 request: ReadStreamRequest,
312 ) -> Result<ReadStreamResponse, RuntimeError> {
313 let placement = self.shard_map.locate(&request.stream_id);
314 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
315 let (response_tx, response_rx) = oneshot::channel();
316 self.send_core_command(
317 mailbox,
318 CoreCommand::ReadStream {
319 request,
320 placement,
321 response_tx,
322 },
323 response_rx,
324 )
325 .await
326 }
327
328 pub async fn publish_snapshot(
329 &self,
330 request: PublishSnapshotRequest,
331 ) -> Result<PublishSnapshotResponse, RuntimeError> {
332 let placement = self.shard_map.locate(&request.stream_id);
333 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
334 let (response_tx, response_rx) = oneshot::channel();
335 self.send_core_command(
336 mailbox,
337 CoreCommand::PublishSnapshot {
338 request,
339 placement,
340 response_tx,
341 },
342 response_rx,
343 )
344 .await
345 }
346
347 pub async fn read_snapshot(
348 &self,
349 request: ReadSnapshotRequest,
350 ) -> Result<ReadSnapshotResponse, RuntimeError> {
351 let placement = self.shard_map.locate(&request.stream_id);
352 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
353 let (response_tx, response_rx) = oneshot::channel();
354 self.send_core_command(
355 mailbox,
356 CoreCommand::ReadSnapshot {
357 request,
358 placement,
359 response_tx,
360 },
361 response_rx,
362 )
363 .await
364 }
365
366 pub async fn delete_snapshot(
367 &self,
368 request: DeleteSnapshotRequest,
369 ) -> Result<(), RuntimeError> {
370 let placement = self.shard_map.locate(&request.stream_id);
371 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
372 let (response_tx, response_rx) = oneshot::channel();
373 self.send_core_command(
374 mailbox,
375 CoreCommand::DeleteSnapshot {
376 request,
377 placement,
378 response_tx,
379 },
380 response_rx,
381 )
382 .await
383 }
384
385 pub async fn bootstrap_stream(
386 &self,
387 request: BootstrapStreamRequest,
388 ) -> Result<BootstrapStreamResponse, RuntimeError> {
389 let placement = self.shard_map.locate(&request.stream_id);
390 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
391 let (response_tx, response_rx) = oneshot::channel();
392 self.send_core_command(
393 mailbox,
394 CoreCommand::BootstrapStream {
395 request,
396 placement,
397 response_tx,
398 },
399 response_rx,
400 )
401 .await
402 }
403
404 pub async fn wait_read_stream(
405 &self,
406 request: ReadStreamRequest,
407 ) -> Result<ReadStreamResponse, RuntimeError> {
408 let placement = self.shard_map.locate(&request.stream_id);
409 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
410 let waiter_id = self.next_waiter_id.fetch_add(1, Ordering::Relaxed);
411 let stream_id = request.stream_id.clone();
412 let (response_tx, response_rx) = oneshot::channel();
413 self.enqueue_core_command(
414 mailbox,
415 CoreCommand::WaitRead {
416 request,
417 placement,
418 waiter_id,
419 response_tx,
420 },
421 )
422 .await?;
423 let mut cancel = WaitReadCancel::new(mailbox.tx.clone(), stream_id, placement, waiter_id);
424 let response = response_rx
425 .await
426 .map_err(|_| RuntimeError::ResponseDropped {
427 core_id: mailbox.core_id,
428 })?;
429 cancel.disarm();
430 response
431 }
432
433 pub async fn require_local_live_read_owner(
434 &self,
435 stream_id: &BucketStreamId,
436 ) -> Result<(), RuntimeError> {
437 let placement = self.shard_map.locate(stream_id);
438 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
439 let (response_tx, response_rx) = oneshot::channel();
440 self.send_core_command(
441 mailbox,
442 CoreCommand::RequireLiveReadOwner {
443 placement,
444 response_tx,
445 },
446 response_rx,
447 )
448 .await
449 }
450
451 pub async fn close_stream(
452 &self,
453 request: CloseStreamRequest,
454 ) -> Result<CloseStreamResponse, RuntimeError> {
455 let placement = self.shard_map.locate(&request.stream_id);
456 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
457 let (response_tx, response_rx) = oneshot::channel();
458 self.send_core_command(
459 mailbox,
460 CoreCommand::CloseStream {
461 request,
462 placement,
463 response_tx,
464 },
465 response_rx,
466 )
467 .await
468 }
469
470 pub async fn delete_stream(
471 &self,
472 request: DeleteStreamRequest,
473 ) -> Result<DeleteStreamResponse, RuntimeError> {
474 let response = self.delete_stream_on_owner(request).await?;
475 if let Some(parent_to_release) = response.parent_to_release.clone() {
476 self.release_fork_ref_cascade(parent_to_release).await?;
477 }
478 Ok(response)
479 }
480
481 async fn delete_stream_on_owner(
482 &self,
483 request: DeleteStreamRequest,
484 ) -> Result<DeleteStreamResponse, RuntimeError> {
485 let placement = self.shard_map.locate(&request.stream_id);
486 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
487 let (response_tx, response_rx) = oneshot::channel();
488 self.send_core_command(
489 mailbox,
490 CoreCommand::DeleteStream {
491 request,
492 placement,
493 response_tx,
494 },
495 response_rx,
496 )
497 .await
498 }
499
500 async fn add_fork_ref_on_owner(
501 &self,
502 stream_id: BucketStreamId,
503 now_ms: u64,
504 ) -> Result<ForkRefResponse, RuntimeError> {
505 let placement = self.shard_map.locate(&stream_id);
506 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
507 let (response_tx, response_rx) = oneshot::channel();
508 self.send_core_command(
509 mailbox,
510 CoreCommand::AddForkRef {
511 stream_id,
512 now_ms,
513 placement,
514 response_tx,
515 },
516 response_rx,
517 )
518 .await
519 }
520
521 async fn release_fork_ref_on_owner(
522 &self,
523 stream_id: BucketStreamId,
524 ) -> Result<ForkRefResponse, RuntimeError> {
525 let placement = self.shard_map.locate(&stream_id);
526 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
527 let (response_tx, response_rx) = oneshot::channel();
528 self.send_core_command(
529 mailbox,
530 CoreCommand::ReleaseForkRef {
531 stream_id,
532 placement,
533 response_tx,
534 },
535 response_rx,
536 )
537 .await
538 }
539
540 async fn release_fork_ref_cascade(
541 &self,
542 stream_id: BucketStreamId,
543 ) -> Result<(), RuntimeError> {
544 let mut next = Some(stream_id);
545 while let Some(current) = next {
546 let response = self.release_fork_ref_on_owner(current).await?;
547 next = response.parent_to_release;
548 }
549 Ok(())
550 }
551
552 pub async fn flush_cold(
553 &self,
554 request: FlushColdRequest,
555 ) -> Result<FlushColdResponse, RuntimeError> {
556 let placement = self.shard_map.locate(&request.stream_id);
557 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
558 let (response_tx, response_rx) = oneshot::channel();
559 self.send_core_command(
560 mailbox,
561 CoreCommand::FlushCold {
562 request,
563 placement,
564 response_tx,
565 },
566 response_rx,
567 )
568 .await
569 }
570
571 pub async fn append_external(
572 &self,
573 request: AppendExternalRequest,
574 ) -> Result<AppendResponse, RuntimeError> {
575 let placement = self.shard_map.locate(&request.stream_id);
576 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
577 let (response_tx, response_rx) = oneshot::channel();
578 self.send_core_command(
579 mailbox,
580 CoreCommand::AppendExternal {
581 request,
582 placement,
583 response_tx,
584 },
585 response_rx,
586 )
587 .await
588 }
589
590 pub async fn plan_cold_flush(
591 &self,
592 request: PlanColdFlushRequest,
593 ) -> Result<Option<ColdFlushCandidate>, RuntimeError> {
594 let placement = self.shard_map.locate(&request.stream_id);
595 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
596 let (response_tx, response_rx) = oneshot::channel();
597 self.send_core_command(
598 mailbox,
599 CoreCommand::PlanColdFlush {
600 request,
601 placement,
602 response_tx,
603 },
604 response_rx,
605 )
606 .await
607 }
608
609 pub async fn flush_cold_once(
610 &self,
611 request: PlanColdFlushRequest,
612 ) -> Result<Option<FlushColdResponse>, RuntimeError> {
613 let Some(candidate) = self.plan_cold_flush(request).await? else {
614 return Ok(None);
615 };
616 self.flush_cold_candidate(candidate).await.map(Some)
617 }
618
619 pub async fn plan_next_cold_flush(
620 &self,
621 raft_group_id: RaftGroupId,
622 request: PlanGroupColdFlushRequest,
623 ) -> Result<Option<ColdFlushCandidate>, RuntimeError> {
624 let placement = self.placement_for_group(raft_group_id)?;
625 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
626 let (response_tx, response_rx) = oneshot::channel();
627 self.send_core_command(
628 mailbox,
629 CoreCommand::PlanNextColdFlush {
630 request,
631 placement,
632 response_tx,
633 },
634 response_rx,
635 )
636 .await
637 }
638
639 pub async fn plan_next_cold_flush_batch(
640 &self,
641 raft_group_id: RaftGroupId,
642 request: PlanGroupColdFlushRequest,
643 max_candidates: usize,
644 ) -> Result<Vec<ColdFlushCandidate>, RuntimeError> {
645 let placement = self.placement_for_group(raft_group_id)?;
646 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
647 let (response_tx, response_rx) = oneshot::channel();
648 self.send_core_command(
649 mailbox,
650 CoreCommand::PlanNextColdFlushBatch {
651 request,
652 placement,
653 max_candidates,
654 response_tx,
655 },
656 response_rx,
657 )
658 .await
659 }
660
661 pub async fn flush_cold_group_once(
662 &self,
663 raft_group_id: RaftGroupId,
664 request: PlanGroupColdFlushRequest,
665 ) -> Result<Option<FlushColdResponse>, RuntimeError> {
666 let Some(candidate) = self.plan_next_cold_flush(raft_group_id, request).await? else {
667 return Ok(None);
668 };
669 match self.flush_cold_candidate(candidate).await {
670 Ok(response) => Ok(Some(response)),
671 Err(err) if is_stale_cold_flush_candidate_error(&err) => Ok(None),
672 Err(err) => Err(err),
673 }
674 }
675
676 pub async fn flush_cold_group_batch_once(
677 &self,
678 raft_group_id: RaftGroupId,
679 request: PlanGroupColdFlushRequest,
680 max_candidates: usize,
681 ) -> Result<Vec<FlushColdResponse>, RuntimeError> {
682 let candidates = self
683 .plan_next_cold_flush_batch(raft_group_id, request, max_candidates)
684 .await?;
685 if candidates.is_empty() {
686 return Ok(Vec::new());
687 }
688 match self.flush_cold_candidates_batch(candidates).await {
689 Ok(responses) => Ok(responses),
690 Err(err) if is_stale_cold_flush_candidate_error(&err) => Ok(Vec::new()),
691 Err(err) => Err(err),
692 }
693 }
694
695 async fn flush_cold_candidate(
696 &self,
697 candidate: ColdFlushCandidate,
698 ) -> Result<FlushColdResponse, RuntimeError> {
699 let Some(cold_store) = self.cold_store.as_ref() else {
700 return Err(RuntimeError::ColdStoreConfig {
701 message: "URSULA_COLD_BACKEND must be configured before flushing cold chunks"
702 .to_owned(),
703 });
704 };
705 let path = new_cold_chunk_path(
706 &candidate.stream_id,
707 candidate.start_offset,
708 candidate.end_offset,
709 );
710 let upload_started_at = Instant::now();
711 let object_size = cold_store
712 .write_chunk(&path, &candidate.payload)
713 .await
714 .map_err(|err| RuntimeError::ColdStoreIo {
715 message: err.to_string(),
716 })?;
717 self.metrics
718 .record_cold_upload(object_size, elapsed_ns(upload_started_at));
719 let publish_started_at = Instant::now();
720 let publish = self
721 .flush_cold(FlushColdRequest {
722 stream_id: candidate.stream_id,
723 chunk: ColdChunkRef {
724 start_offset: candidate.start_offset,
725 end_offset: candidate.end_offset,
726 s3_path: path.clone(),
727 object_size,
728 },
729 })
730 .await;
731 match publish {
732 Ok(response) => {
733 self.metrics
734 .record_cold_publish(object_size, elapsed_ns(publish_started_at));
735 Ok(response)
736 }
737 Err(err) => {
738 let cleanup_failed = cold_store.delete_chunk(&path).await.is_err();
739 self.metrics
740 .record_cold_orphan_cleanup(object_size, cleanup_failed);
741 Err(err)
742 }
743 }
744 }
745
746 pub(crate) async fn flush_cold_candidates_batch(
747 &self,
748 candidates: Vec<ColdFlushCandidate>,
749 ) -> Result<Vec<FlushColdResponse>, RuntimeError> {
750 let Some(cold_store) = self.cold_store.as_ref() else {
751 return Err(RuntimeError::ColdStoreConfig {
752 message: "URSULA_COLD_BACKEND must be configured before flushing cold chunks"
753 .to_owned(),
754 });
755 };
756 let mut requests = Vec::with_capacity(candidates.len());
757 let mut uploaded = Vec::with_capacity(candidates.len());
758 for candidate in candidates {
759 let path = new_cold_chunk_path(
760 &candidate.stream_id,
761 candidate.start_offset,
762 candidate.end_offset,
763 );
764 let upload_started_at = Instant::now();
765 let object_size = cold_store
766 .write_chunk(&path, &candidate.payload)
767 .await
768 .map_err(|err| RuntimeError::ColdStoreIo {
769 message: err.to_string(),
770 })?;
771 self.metrics
772 .record_cold_upload(object_size, elapsed_ns(upload_started_at));
773 uploaded.push((path.clone(), object_size));
774 requests.push(FlushColdRequest {
775 stream_id: candidate.stream_id,
776 chunk: ColdChunkRef {
777 start_offset: candidate.start_offset,
778 end_offset: candidate.end_offset,
779 s3_path: path,
780 object_size,
781 },
782 });
783 }
784
785 let placement = self.shard_map.locate(&requests[0].stream_id);
786 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
787 let (response_tx, response_rx) = oneshot::channel();
788 let publish_started_at = Instant::now();
789 let publish = self
790 .send_core_command(
791 mailbox,
792 CoreCommand::FlushColdBatch {
793 requests,
794 placement,
795 response_tx,
796 },
797 response_rx,
798 )
799 .await;
800 match publish {
801 Ok(responses) => {
802 let publish_ns = elapsed_ns(publish_started_at);
803 let per_chunk_publish_ns =
804 publish_ns / u64::try_from(uploaded.len()).expect("uploaded len fits u64");
805 for (_, object_size) in &uploaded {
806 self.metrics
807 .record_cold_publish(*object_size, per_chunk_publish_ns);
808 }
809 Ok(responses)
810 }
811 Err(err) => {
812 for (path, object_size) in uploaded {
813 let cleanup_failed = cold_store.delete_chunk(&path).await.is_err();
814 self.metrics
815 .record_cold_orphan_cleanup(object_size, cleanup_failed);
816 }
817 Err(err)
818 }
819 }
820 }
821
822 pub async fn flush_cold_all_groups_once(
823 &self,
824 request: PlanGroupColdFlushRequest,
825 ) -> Result<usize, RuntimeError> {
826 self.flush_cold_all_groups_once_bounded(request, 1).await
827 }
828
829 pub async fn flush_cold_all_groups_once_bounded(
830 &self,
831 request: PlanGroupColdFlushRequest,
832 max_concurrency: usize,
833 ) -> Result<usize, RuntimeError> {
834 let max_concurrency = max_concurrency.max(1);
835 if max_concurrency == 1 {
836 return self.flush_cold_all_groups_once_serial(request).await;
837 }
838 let mut flushed = 0;
839 let mut next_group_id = 0;
840 let group_count = self.shard_map.raft_group_count();
841 let mut tasks = JoinSet::new();
842
843 while next_group_id < group_count || !tasks.is_empty() {
844 while next_group_id < group_count && tasks.len() < max_concurrency {
845 let runtime = self.clone();
846 let request = request.clone();
847 let group_id = RaftGroupId(next_group_id);
848 next_group_id += 1;
849 tasks.spawn(async move {
850 runtime
851 .flush_cold_group_batch_once(
852 group_id,
853 request,
854 COLD_FLUSH_GROUP_BATCH_MAX_CHUNKS,
855 )
856 .await
857 .map(|responses| responses.len())
858 });
859 }
860 if let Some(result) = tasks.join_next().await {
861 match result {
862 Ok(Ok(count)) => flushed += count,
863 Ok(Err(err)) => return Err(err),
864 Err(err) => {
865 return Err(RuntimeError::ColdStoreIo {
866 message: format!("cold flush task failed: {err}"),
867 });
868 }
869 }
870 }
871 }
872 Ok(flushed)
873 }
874
875 async fn flush_cold_all_groups_once_serial(
876 &self,
877 request: PlanGroupColdFlushRequest,
878 ) -> Result<usize, RuntimeError> {
879 let mut flushed = 0;
880 for group_id in 0..self.shard_map.raft_group_count() {
881 flushed += self
882 .flush_cold_group_batch_once(
883 RaftGroupId(group_id),
884 request.clone(),
885 COLD_FLUSH_GROUP_BATCH_MAX_CHUNKS,
886 )
887 .await?
888 .len();
889 }
890 Ok(flushed)
891 }
892
893 pub async fn append(&self, request: AppendRequest) -> Result<AppendResponse, RuntimeError> {
894 if request.payload.is_empty() {
895 return Err(RuntimeError::EmptyAppend);
896 }
897 let placement = self.shard_map.locate(&request.stream_id);
898 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
899 let (response_tx, response_rx) = oneshot::channel();
900 self.send_core_command(
901 mailbox,
902 CoreCommand::Append {
903 request,
904 placement,
905 response_tx,
906 },
907 response_rx,
908 )
909 .await
910 }
911
912 pub async fn append_batch(
913 &self,
914 request: AppendBatchRequest,
915 ) -> Result<AppendBatchResponse, RuntimeError> {
916 if request.payloads.is_empty() {
917 return Err(RuntimeError::EmptyAppend);
918 }
919 let placement = self.shard_map.locate(&request.stream_id);
920 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
921 let (response_tx, response_rx) = oneshot::channel();
922 self.send_core_command(
923 mailbox,
924 CoreCommand::AppendBatch {
925 request,
926 placement,
927 response_tx,
928 },
929 response_rx,
930 )
931 .await
932 }
933
934 pub async fn snapshot_group(
935 &self,
936 raft_group_id: RaftGroupId,
937 ) -> Result<GroupSnapshot, RuntimeError> {
938 let placement = self.placement_for_group(raft_group_id)?;
939 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
940 let (response_tx, response_rx) = oneshot::channel();
941 self.send_core_command(
942 mailbox,
943 CoreCommand::SnapshotGroup {
944 placement,
945 response_tx,
946 },
947 response_rx,
948 )
949 .await
950 }
951
952 pub async fn install_group_snapshot(
953 &self,
954 snapshot: GroupSnapshot,
955 ) -> Result<(), RuntimeError> {
956 let expected = self.placement_for_group(snapshot.placement.raft_group_id)?;
957 if snapshot.placement != expected {
958 return Err(RuntimeError::SnapshotPlacementMismatch {
959 expected,
960 actual: snapshot.placement,
961 });
962 }
963 let mailbox = &self.mailboxes[usize::from(expected.core_id.0)];
964 let (response_tx, response_rx) = oneshot::channel();
965 self.send_core_command(
966 mailbox,
967 CoreCommand::InstallGroupSnapshot {
968 snapshot,
969 response_tx,
970 },
971 response_rx,
972 )
973 .await
974 }
975
976 pub async fn warm_group(
977 &self,
978 raft_group_id: RaftGroupId,
979 ) -> Result<ShardPlacement, RuntimeError> {
980 let placement = self.placement_for_group(raft_group_id)?;
981 let mailbox = &self.mailboxes[usize::from(placement.core_id.0)];
982 let (response_tx, response_rx) = oneshot::channel();
983 self.send_core_command(
984 mailbox,
985 CoreCommand::WarmGroup {
986 placement,
987 response_tx,
988 },
989 response_rx,
990 )
991 .await
992 }
993
994 pub async fn warm_all_groups(&self) -> Result<(), RuntimeError> {
995 for raw_group_id in 0..self.shard_map.raft_group_count() {
996 self.warm_group(RaftGroupId(raw_group_id)).await?;
997 }
998 Ok(())
999 }
1000
1001 fn placement_for_group(
1002 &self,
1003 raft_group_id: RaftGroupId,
1004 ) -> Result<ShardPlacement, RuntimeError> {
1005 if raft_group_id.0 >= self.shard_map.raft_group_count() {
1006 return Err(RuntimeError::InvalidRaftGroup {
1007 raft_group_id,
1008 raft_group_count: self.shard_map.raft_group_count(),
1009 });
1010 }
1011 Ok(ShardPlacement {
1012 core_id: CoreId(
1013 (raft_group_id.0 % u32::from(self.shard_map.core_count()))
1014 .try_into()
1015 .expect("core id fits u16"),
1016 ),
1017 shard_id: ShardId(raft_group_id.0),
1018 raft_group_id,
1019 })
1020 }
1021
1022 async fn send_core_command<T>(
1023 &self,
1024 mailbox: &CoreMailbox,
1025 command: CoreCommand,
1026 response_rx: oneshot::Receiver<Result<T, RuntimeError>>,
1027 ) -> Result<T, RuntimeError> {
1028 self.enqueue_core_command(mailbox, command).await?;
1029 response_rx
1030 .await
1031 .map_err(|_| RuntimeError::ResponseDropped {
1032 core_id: mailbox.core_id,
1033 })?
1034 }
1035
1036 async fn enqueue_core_command(
1037 &self,
1038 mailbox: &CoreMailbox,
1039 command: CoreCommand,
1040 ) -> Result<(), RuntimeError> {
1041 if mailbox.tx.capacity() == 0 {
1042 self.metrics.record_mailbox_full(mailbox.core_id);
1043 }
1044 let started_at = Instant::now();
1045 mailbox
1046 .tx
1047 .send(command)
1048 .await
1049 .map_err(|_| RuntimeError::MailboxClosed {
1050 core_id: mailbox.core_id,
1051 })?;
1052 self.metrics
1053 .record_routed_request(mailbox.core_id, elapsed_ns(started_at));
1054 Ok(())
1055 }
1056
1057 pub fn metrics(&self) -> RuntimeMetrics {
1058 RuntimeMetrics {
1059 inner: self.metrics.clone(),
1060 }
1061 }
1062
1063 pub fn mailbox_snapshot(&self) -> RuntimeMailboxSnapshot {
1064 let depths = self
1065 .mailboxes
1066 .iter()
1067 .map(CoreMailbox::depth)
1068 .collect::<Vec<_>>();
1069 let capacities = self
1070 .mailboxes
1071 .iter()
1072 .map(CoreMailbox::capacity)
1073 .collect::<Vec<_>>();
1074 RuntimeMailboxSnapshot { depths, capacities }
1075 }
1076}
1077
1078fn spawn_core_worker(threading: RuntimeThreading, worker: CoreWorker) -> Result<(), RuntimeError> {
1079 let core_id = worker.core_id;
1080 match threading {
1081 RuntimeThreading::HostedTokio => {
1082 tokio::spawn(worker.run());
1083 Ok(())
1084 }
1085 RuntimeThreading::ThreadPerCore => std::thread::Builder::new()
1086 .name(format!("ursula-core-{}", core_id.0))
1087 .spawn(move || {
1088 let runtime = tokio::runtime::Builder::new_current_thread()
1089 .enable_all()
1090 .build()
1091 .expect("build per-core tokio runtime");
1092 runtime.block_on(worker.run());
1093 })
1094 .map(|_| ())
1095 .map_err(|err| RuntimeError::SpawnCoreThread {
1096 core_id,
1097 message: err.to_string(),
1098 }),
1099 }
1100}