1use crate::restart::{RestartIntensity, RestartPolicy, RestartStrategy, RestartTracker};
72use crate::supervisor_common::{WorkerTermination, run_worker};
73use crate::types::{ChildExitReason, ChildId, ChildInfo, ChildType, WorkerContext};
74use crate::worker::Worker;
75use std::fmt;
76use std::sync::Arc;
77use tokio::sync::{mpsc, oneshot};
78use tokio::task::JoinHandle;
79
80pub(crate) struct StatefulWorkerSpec<W: Worker> {
86 pub id: ChildId,
87 pub worker_factory: Arc<dyn Fn(Arc<WorkerContext>) -> W + Send + Sync>,
88 pub restart_policy: RestartPolicy,
89 pub context: Arc<WorkerContext>,
90}
91
92impl<W: Worker> Clone for StatefulWorkerSpec<W> {
93 fn clone(&self) -> Self {
94 Self {
95 id: self.id.clone(),
96 worker_factory: Arc::clone(&self.worker_factory),
97 restart_policy: self.restart_policy,
98 context: Arc::clone(&self.context),
99 }
100 }
101}
102
103impl<W: Worker> StatefulWorkerSpec<W> {
104 pub(crate) fn new(
105 id: impl Into<String>,
106 factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
107 restart_policy: RestartPolicy,
108 context: Arc<WorkerContext>,
109 ) -> Self {
110 Self {
111 id: id.into(),
112 worker_factory: Arc::new(factory),
113 restart_policy,
114 context,
115 }
116 }
117
118 pub(crate) fn create_worker(&self) -> W {
119 (self.worker_factory)(Arc::clone(&self.context))
120 }
121}
122
123impl<W: Worker> fmt::Debug for StatefulWorkerSpec<W> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("StatefulWorkerSpec")
126 .field("id", &self.id)
127 .field("restart_policy", &self.restart_policy)
128 .finish_non_exhaustive()
129 }
130}
131
132pub(crate) struct StatefulWorkerProcess<W: Worker> {
138 pub spec: StatefulWorkerSpec<W>,
139 pub handle: Option<JoinHandle<()>>,
140}
141
142impl<W: Worker> StatefulWorkerProcess<W> {
143 pub(crate) fn spawn<Cmd>(
144 spec: StatefulWorkerSpec<W>,
145 supervisor_name: String,
146 control_tx: mpsc::UnboundedSender<Cmd>,
147 ) -> Self
148 where
149 Cmd: From<WorkerTermination> + Send + 'static,
150 {
151 let worker = spec.create_worker();
152 let worker_id = spec.id.clone();
153 let handle = tokio::spawn(async move {
154 run_worker(supervisor_name, worker_id, worker, control_tx, None).await;
155 });
156
157 Self {
158 spec,
159 handle: Some(handle),
160 }
161 }
162
163 pub(crate) fn spawn_with_link<Cmd>(
165 spec: StatefulWorkerSpec<W>,
166 supervisor_name: String,
167 control_tx: mpsc::UnboundedSender<Cmd>,
168 init_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
169 ) -> Self
170 where
171 Cmd: From<WorkerTermination> + Send + 'static,
172 {
173 let worker = spec.create_worker();
174 let worker_id = spec.id.clone();
175 let handle = tokio::spawn(async move {
176 run_worker(
177 supervisor_name,
178 worker_id,
179 worker,
180 control_tx,
181 Some(init_tx),
182 )
183 .await;
184 });
185
186 Self {
187 spec,
188 handle: Some(handle),
189 }
190 }
191
192 pub(crate) async fn stop(&mut self) {
193 if let Some(handle) = self.handle.take() {
194 handle.abort();
195 drop(handle.await);
197 }
198 }
199}
200
201impl<W: Worker> Drop for StatefulWorkerProcess<W> {
202 fn drop(&mut self) {
203 if let Some(handle) = self.handle.take() {
204 handle.abort();
205 }
206 }
207}
208
209pub(crate) enum StatefulChildSpec<W: Worker> {
215 Worker(StatefulWorkerSpec<W>),
216 Supervisor(Arc<StatefulSupervisorSpec<W>>),
217}
218
219impl<W: Worker> Clone for StatefulChildSpec<W> {
220 fn clone(&self) -> Self {
221 match self {
222 StatefulChildSpec::Worker(w) => StatefulChildSpec::Worker(w.clone()),
223 StatefulChildSpec::Supervisor(s) => StatefulChildSpec::Supervisor(Arc::clone(s)),
224 }
225 }
226}
227
228pub struct StatefulSupervisorSpec<W: Worker> {
230 pub(crate) name: String,
231 pub(crate) children: Vec<StatefulChildSpec<W>>,
232 pub(crate) restart_strategy: RestartStrategy,
233 pub(crate) restart_intensity: RestartIntensity,
234 pub(crate) context: Arc<WorkerContext>,
235}
236
237impl<W: Worker> Clone for StatefulSupervisorSpec<W> {
238 fn clone(&self) -> Self {
239 Self {
240 name: self.name.clone(),
241 children: self.children.clone(),
242 restart_strategy: self.restart_strategy,
243 restart_intensity: self.restart_intensity,
244 context: Arc::clone(&self.context),
245 }
246 }
247}
248
249impl<W: Worker> StatefulSupervisorSpec<W> {
250 pub fn new(name: impl Into<String>) -> Self {
253 Self {
254 name: name.into(),
255 children: Vec::new(),
256 restart_strategy: RestartStrategy::default(),
257 restart_intensity: RestartIntensity::default(),
258 context: Arc::new(WorkerContext::new()),
259 }
260 }
261
262 #[must_use]
264 pub fn with_restart_strategy(mut self, strategy: RestartStrategy) -> Self {
265 self.restart_strategy = strategy;
266 self
267 }
268
269 #[must_use]
271 pub fn with_restart_intensity(mut self, intensity: RestartIntensity) -> Self {
272 self.restart_intensity = intensity;
273 self
274 }
275
276 #[must_use]
279 pub fn with_worker(
280 mut self,
281 id: impl Into<String>,
282 factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
283 restart_policy: RestartPolicy,
284 ) -> Self {
285 self.children
286 .push(StatefulChildSpec::Worker(StatefulWorkerSpec::new(
287 id,
288 factory,
289 restart_policy,
290 Arc::clone(&self.context),
291 )));
292 self
293 }
294
295 #[must_use]
297 pub fn with_supervisor(mut self, supervisor: StatefulSupervisorSpec<W>) -> Self {
298 self.children
299 .push(StatefulChildSpec::Supervisor(Arc::new(supervisor)));
300 self
301 }
302
303 #[must_use]
305 pub fn context(&self) -> &Arc<WorkerContext> {
306 &self.context
307 }
308}
309
310pub(crate) enum StatefulChild<W: Worker> {
316 Worker(StatefulWorkerProcess<W>),
317 Supervisor {
318 handle: StatefulSupervisorHandle<W>,
319 spec: Arc<StatefulSupervisorSpec<W>>,
320 },
321}
322
323impl<W: Worker> StatefulChild<W> {
324 #[inline]
325 pub fn id(&self) -> &str {
326 match self {
327 StatefulChild::Worker(w) => &w.spec.id,
328 StatefulChild::Supervisor { spec, .. } => &spec.name,
329 }
330 }
331
332 #[inline]
333 pub fn child_type(&self) -> ChildType {
334 match self {
335 StatefulChild::Worker(_) => ChildType::Worker,
336 StatefulChild::Supervisor { .. } => ChildType::Supervisor,
337 }
338 }
339
340 #[inline]
341 #[allow(clippy::unnecessary_wraps)]
342 pub fn restart_policy(&self) -> Option<RestartPolicy> {
343 match self {
344 StatefulChild::Worker(w) => Some(w.spec.restart_policy),
345 StatefulChild::Supervisor { .. } => Some(RestartPolicy::Permanent),
346 }
347 }
348
349 pub async fn shutdown(&mut self) {
350 match self {
351 StatefulChild::Worker(w) => w.stop().await,
352 StatefulChild::Supervisor { handle, .. } => {
353 let _shutdown_result = handle.shutdown().await;
354 }
355 }
356 }
357}
358
359pub(crate) enum StatefulRestartInfo<W: Worker> {
361 Worker(StatefulWorkerSpec<W>),
362 Supervisor(Arc<StatefulSupervisorSpec<W>>),
363}
364
365#[derive(Debug)]
371pub enum StatefulSupervisorError {
372 NoChildren(String),
374 AllChildrenFailed(String),
376 ShuttingDown(String),
378 ChildAlreadyExists(String),
380 ChildNotFound(String),
382 InitializationFailed {
384 child_id: String,
386 reason: String,
388 },
389 InitializationTimeout {
391 child_id: String,
393 timeout: std::time::Duration,
395 },
396}
397
398impl fmt::Display for StatefulSupervisorError {
399 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400 match self {
401 StatefulSupervisorError::NoChildren(name) => {
402 write!(f, "stateful supervisor '{name}' has no children")
403 }
404 StatefulSupervisorError::AllChildrenFailed(name) => {
405 write!(
406 f,
407 "all children failed for stateful supervisor '{name}' - restart intensity limit exceeded"
408 )
409 }
410 StatefulSupervisorError::ShuttingDown(name) => {
411 write!(
412 f,
413 "stateful supervisor '{name}' is shutting down - operation not permitted"
414 )
415 }
416 StatefulSupervisorError::ChildAlreadyExists(id) => {
417 write!(
418 f,
419 "child with id '{id}' already exists - use a unique identifier"
420 )
421 }
422 StatefulSupervisorError::ChildNotFound(id) => {
423 write!(
424 f,
425 "child with id '{id}' not found - it may have already terminated"
426 )
427 }
428 StatefulSupervisorError::InitializationFailed { child_id, reason } => {
429 write!(f, "child '{child_id}' initialization failed: {reason}")
430 }
431 StatefulSupervisorError::InitializationTimeout { child_id, timeout } => {
432 write!(
433 f,
434 "child '{child_id}' initialization timed out after {timeout:?}"
435 )
436 }
437 }
438 }
439}
440
441impl std::error::Error for StatefulSupervisorError {}
442
443pub(crate) enum StatefulSupervisorCommand<W: Worker> {
449 StartChild {
450 spec: StatefulWorkerSpec<W>,
451 respond_to: oneshot::Sender<Result<ChildId, StatefulSupervisorError>>,
452 },
453 StartChildLinked {
454 spec: StatefulWorkerSpec<W>,
455 timeout: std::time::Duration,
456 respond_to: oneshot::Sender<Result<ChildId, StatefulSupervisorError>>,
457 },
458 TerminateChild {
459 id: ChildId,
460 respond_to: oneshot::Sender<Result<(), StatefulSupervisorError>>,
461 },
462 WhichChildren {
463 respond_to: oneshot::Sender<Result<Vec<ChildInfo>, StatefulSupervisorError>>,
464 },
465 GetRestartStrategy {
466 respond_to: oneshot::Sender<RestartStrategy>,
467 },
468 GetUptime {
469 respond_to: oneshot::Sender<u64>,
470 },
471 ChildTerminated {
472 id: ChildId,
473 reason: ChildExitReason,
474 },
475 Shutdown,
476}
477
478impl<W: Worker> From<WorkerTermination> for StatefulSupervisorCommand<W> {
479 fn from(term: WorkerTermination) -> Self {
480 StatefulSupervisorCommand::ChildTerminated {
481 id: term.id,
482 reason: term.reason,
483 }
484 }
485}
486
487pub(crate) struct StatefulSupervisorRuntime<W: Worker> {
489 name: String,
490 children: Vec<StatefulChild<W>>,
491 control_rx: mpsc::UnboundedReceiver<StatefulSupervisorCommand<W>>,
492 control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
493 restart_strategy: RestartStrategy,
494 restart_tracker: RestartTracker,
495 created_at: std::time::Instant,
496}
497
498impl<W: Worker> StatefulSupervisorRuntime<W> {
499 pub(crate) fn new(
500 spec: StatefulSupervisorSpec<W>,
501 control_rx: mpsc::UnboundedReceiver<StatefulSupervisorCommand<W>>,
502 control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
503 ) -> Self {
504 let mut children = Vec::with_capacity(spec.children.len());
505
506 for child_spec in spec.children {
507 match child_spec {
508 StatefulChildSpec::Worker(worker_spec) => {
509 let worker = StatefulWorkerProcess::spawn(
510 worker_spec,
511 spec.name.clone(),
512 control_tx.clone(),
513 );
514 children.push(StatefulChild::Worker(worker));
515 }
516 StatefulChildSpec::Supervisor(supervisor_spec) => {
517 let supervisor = StatefulSupervisorHandle::start((*supervisor_spec).clone());
518 children.push(StatefulChild::Supervisor {
519 handle: supervisor,
520 spec: Arc::clone(&supervisor_spec),
521 });
522 }
523 }
524 }
525
526 Self {
527 name: spec.name,
528 children,
529 control_rx,
530 control_tx,
531 restart_strategy: spec.restart_strategy,
532 restart_tracker: RestartTracker::new(spec.restart_intensity),
533 created_at: std::time::Instant::now(),
534 }
535 }
536
537 pub(crate) async fn run(mut self) {
538 while let Some(command) = self.control_rx.recv().await {
539 match command {
540 StatefulSupervisorCommand::StartChild { spec, respond_to } => {
541 let result = self.handle_start_child(spec);
542 let _send = respond_to.send(result);
543 }
544 StatefulSupervisorCommand::StartChildLinked {
545 spec,
546 timeout,
547 respond_to,
548 } => {
549 let result = self.handle_start_child_linked(spec, timeout).await;
550 let _send = respond_to.send(result);
551 }
552 StatefulSupervisorCommand::TerminateChild { id, respond_to } => {
553 let result = self.handle_terminate_child(&id).await;
554 let _send = respond_to.send(result);
555 }
556 StatefulSupervisorCommand::WhichChildren { respond_to } => {
557 let result = self.handle_which_children();
558 let _send = respond_to.send(result);
559 }
560 StatefulSupervisorCommand::GetRestartStrategy { respond_to } => {
561 let _send = respond_to.send(self.restart_strategy);
562 }
563 StatefulSupervisorCommand::GetUptime { respond_to } => {
564 let uptime = self.created_at.elapsed().as_secs();
565 let _send = respond_to.send(uptime);
566 }
567 StatefulSupervisorCommand::ChildTerminated { id, reason } => {
568 self.handle_child_terminated(id, reason).await;
569 }
570 StatefulSupervisorCommand::Shutdown => {
571 self.shutdown_children().await;
572 return;
573 }
574 }
575 }
576
577 self.shutdown_children().await;
578 }
579
580 fn handle_start_child(
581 &mut self,
582 spec: StatefulWorkerSpec<W>,
583 ) -> Result<ChildId, StatefulSupervisorError> {
584 if self.children.iter().any(|c| c.id() == spec.id) {
586 return Err(StatefulSupervisorError::ChildAlreadyExists(spec.id.clone()));
587 }
588
589 let id = spec.id.clone();
590 let worker = StatefulWorkerProcess::spawn(spec, self.name.clone(), self.control_tx.clone());
591
592 self.children.push(StatefulChild::Worker(worker));
593 tracing::debug!(
594 supervisor = %self.name,
595 child = %id,
596 "dynamically started child"
597 );
598
599 Ok(id)
600 }
601
602 async fn handle_start_child_linked(
603 &mut self,
604 spec: StatefulWorkerSpec<W>,
605 timeout: std::time::Duration,
606 ) -> Result<ChildId, StatefulSupervisorError> {
607 if self.children.iter().any(|c| c.id() == spec.id) {
609 return Err(StatefulSupervisorError::ChildAlreadyExists(spec.id.clone()));
610 }
611
612 let id = spec.id.clone();
613 let (init_tx, init_rx) = oneshot::channel();
614
615 let worker = StatefulWorkerProcess::spawn_with_link(
616 spec,
617 self.name.clone(),
618 self.control_tx.clone(),
619 init_tx,
620 );
621
622 let init_result = tokio::time::timeout(timeout, init_rx).await;
624
625 match init_result {
626 Ok(Ok(Ok(()))) => {
627 self.children.push(StatefulChild::Worker(worker));
629 tracing::debug!(
630 supervisor = %self.name,
631 child = %id,
632 "linked child started successfully"
633 );
634 Ok(id)
635 }
636 Ok(Ok(Err(reason))) => {
637 tracing::error!(
639 supervisor = %self.name,
640 child = %id,
641 reason = %reason,
642 "linked child initialization failed"
643 );
644 Err(StatefulSupervisorError::InitializationFailed {
646 child_id: id,
647 reason,
648 })
649 }
650 Ok(Err(_)) => {
651 tracing::error!(
653 supervisor = %self.name,
654 child = %id,
655 "linked child panicked during initialization"
656 );
657 Err(StatefulSupervisorError::InitializationFailed {
658 child_id: id,
659 reason: "worker panicked during initialization".to_owned(),
660 })
661 }
662 Err(_) => {
663 tracing::error!(
665 supervisor = %self.name,
666 child = %id,
667 timeout_secs = ?timeout.as_secs(),
668 "linked child initialization timed out"
669 );
670 Err(StatefulSupervisorError::InitializationTimeout {
671 child_id: id,
672 timeout,
673 })
674 }
675 }
676 }
677
678 async fn handle_terminate_child(&mut self, id: &str) -> Result<(), StatefulSupervisorError> {
679 let position = self
680 .children
681 .iter()
682 .position(|c| c.id() == id)
683 .ok_or_else(|| StatefulSupervisorError::ChildNotFound(id.to_owned()))?;
684
685 let mut child = self.children.remove(position);
686 child.shutdown().await;
687
688 tracing::debug!(
689 supervisor = %self.name,
690 child = %id,
691 "terminated child"
692 );
693 Ok(())
694 }
695
696 #[allow(clippy::unnecessary_wraps)]
697 fn handle_which_children(&self) -> Result<Vec<ChildInfo>, StatefulSupervisorError> {
698 let info = self
699 .children
700 .iter()
701 .map(|child| ChildInfo {
702 id: child.id().to_owned(),
703 child_type: child.child_type(),
704 restart_policy: child.restart_policy(),
705 })
706 .collect();
707
708 Ok(info)
709 }
710
711 #[allow(clippy::indexing_slicing)]
712 async fn handle_child_terminated(&mut self, id: ChildId, reason: ChildExitReason) {
713 tracing::debug!(
714 supervisor = %self.name,
715 child = %id,
716 reason = ?reason,
717 "child terminated"
718 );
719
720 let Some(position) = self.children.iter().position(|c| c.id() == id) else {
721 tracing::warn!(
722 supervisor = %self.name,
723 child = %id,
724 "terminated child not found in list"
725 );
726 return;
727 };
728
729 let should_restart = match &self.children[position] {
731 StatefulChild::Worker(w) => match w.spec.restart_policy {
732 RestartPolicy::Permanent => true,
733 RestartPolicy::Temporary => false,
734 RestartPolicy::Transient => reason == ChildExitReason::Abnormal,
735 },
736 StatefulChild::Supervisor { .. } => true, };
738
739 if !should_restart {
740 tracing::debug!(
741 supervisor = %self.name,
742 child = %id,
743 policy = ?self.children[position].restart_policy(),
744 reason = ?reason,
745 "not restarting child"
746 );
747 self.children.remove(position);
748 return;
749 }
750
751 if self.restart_tracker.record_restart() {
753 tracing::error!(
754 supervisor = %self.name,
755 "restart intensity exceeded, shutting down"
756 );
757 self.shutdown_children().await;
758 return;
759 }
760
761 match self.restart_strategy {
763 RestartStrategy::OneForOne => {
764 self.restart_child(position).await;
765 }
766 RestartStrategy::OneForAll => {
767 self.restart_all_children().await;
768 }
769 RestartStrategy::RestForOne => {
770 self.restart_from(position).await;
771 }
772 }
773 }
774
775 #[allow(clippy::indexing_slicing)]
776 async fn restart_child(&mut self, position: usize) {
777 let restart_info = match &self.children[position] {
779 StatefulChild::Worker(worker) => StatefulRestartInfo::Worker(worker.spec.clone()),
780 StatefulChild::Supervisor { spec, .. } => {
781 StatefulRestartInfo::Supervisor(Arc::clone(spec))
782 }
783 };
784
785 self.children[position].shutdown().await;
787
788 match restart_info {
790 StatefulRestartInfo::Worker(spec) => {
791 tracing::debug!(
792 supervisor = %self.name,
793 worker = %spec.id,
794 "restarting worker"
795 );
796 let new_worker = StatefulWorkerProcess::spawn(
797 spec.clone(),
798 self.name.clone(),
799 self.control_tx.clone(),
800 );
801 self.children[position] = StatefulChild::Worker(new_worker);
802 tracing::debug!(
803 supervisor = %self.name,
804 worker = %spec.id,
805 "worker restarted"
806 );
807 }
808 StatefulRestartInfo::Supervisor(spec) => {
809 let name = spec.name.clone();
810 tracing::debug!(
811 supervisor = %self.name,
812 child_supervisor = %name,
813 "restarting supervisor"
814 );
815 let new_handle = StatefulSupervisorHandle::start((*spec).clone());
816 self.children[position] = StatefulChild::Supervisor {
817 handle: new_handle,
818 spec,
819 };
820 tracing::debug!(
821 supervisor = %self.name,
822 child_supervisor = %name,
823 "supervisor restarted"
824 );
825 }
826 }
827 }
828
829 async fn restart_all_children(&mut self) {
830 tracing::debug!(
831 supervisor = %self.name,
832 "restarting all children (one_for_all)"
833 );
834
835 for child in &mut self.children {
837 child.shutdown().await;
838 }
839
840 for child in &mut self.children {
842 if let StatefulChild::Worker(worker) = child {
843 let spec = worker.spec.clone();
844 let new_worker = StatefulWorkerProcess::spawn(
845 spec.clone(),
846 self.name.clone(),
847 self.control_tx.clone(),
848 );
849 *child = StatefulChild::Worker(new_worker);
850 tracing::debug!(
851 supervisor = %self.name,
852 child = %spec.id,
853 "child restarted"
854 );
855 }
856 }
857 }
858
859 #[allow(clippy::indexing_slicing)]
860 async fn restart_from(&mut self, position: usize) {
861 tracing::debug!(
862 supervisor = %self.name,
863 position = %position,
864 "restarting from position (rest_for_one)"
865 );
866
867 for i in position..self.children.len() {
868 self.children[i].shutdown().await;
869
870 if let StatefulChild::Worker(worker) = &self.children[i] {
871 let spec = worker.spec.clone();
872 let new_worker = StatefulWorkerProcess::spawn(
873 spec.clone(),
874 self.name.clone(),
875 self.control_tx.clone(),
876 );
877 self.children[i] = StatefulChild::Worker(new_worker);
878 tracing::debug!(
879 supervisor = %self.name,
880 child = %spec.id,
881 "child restarted"
882 );
883 }
884 }
885 }
886
887 async fn shutdown_children(&mut self) {
888 for mut child in self.children.drain(..) {
889 let id = child.id().to_owned();
890 child.shutdown().await;
891 tracing::debug!(
892 supervisor = %self.name,
893 child = %id,
894 "shut down child"
895 );
896 }
897 }
898}
899
900#[derive(Clone)]
906pub struct StatefulSupervisorHandle<W: Worker> {
907 pub(crate) name: Arc<String>,
908 pub(crate) control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
909}
910
911impl<W: Worker> StatefulSupervisorHandle<W> {
912 #[must_use]
914 pub fn start(spec: StatefulSupervisorSpec<W>) -> Self {
915 let (control_tx, control_rx) = mpsc::unbounded_channel();
916 let name_arc = Arc::new(spec.name.clone());
917 let runtime = StatefulSupervisorRuntime::new(spec, control_rx, control_tx.clone());
918
919 let runtime_name = Arc::clone(&name_arc);
920 tokio::spawn(async move {
921 runtime.run().await;
922 tracing::debug!(name = %*runtime_name, "supervisor stopped");
923 });
924
925 Self {
926 name: name_arc,
927 control_tx,
928 }
929 }
930
931 pub async fn start_child(
937 &self,
938 id: impl Into<String>,
939 factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
940 restart_policy: RestartPolicy,
941 context: Arc<WorkerContext>,
942 ) -> Result<ChildId, StatefulSupervisorError> {
943 let (result_tx, result_rx) = oneshot::channel();
944 let spec = StatefulWorkerSpec::new(id, factory, restart_policy, context);
945
946 self.control_tx
947 .send(StatefulSupervisorCommand::StartChild {
948 spec,
949 respond_to: result_tx,
950 })
951 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
952
953 result_rx
954 .await
955 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
956 }
957
958 pub async fn start_child_linked(
983 &self,
984 id: impl Into<String>,
985 factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
986 restart_policy: RestartPolicy,
987 context: Arc<WorkerContext>,
988 timeout: std::time::Duration,
989 ) -> Result<ChildId, StatefulSupervisorError> {
990 let (result_tx, result_rx) = oneshot::channel();
991 let spec = StatefulWorkerSpec::new(id, factory, restart_policy, context);
992
993 self.control_tx
994 .send(StatefulSupervisorCommand::StartChildLinked {
995 spec,
996 timeout,
997 respond_to: result_tx,
998 })
999 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1000
1001 result_rx
1002 .await
1003 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1004 }
1005
1006 pub async fn terminate_child(&self, id: &str) -> Result<(), StatefulSupervisorError> {
1012 let (result_tx, result_rx) = oneshot::channel();
1013
1014 self.control_tx
1015 .send(StatefulSupervisorCommand::TerminateChild {
1016 id: id.to_owned(),
1017 respond_to: result_tx,
1018 })
1019 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1020
1021 result_rx
1022 .await
1023 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1024 }
1025
1026 pub async fn which_children(&self) -> Result<Vec<ChildInfo>, StatefulSupervisorError> {
1032 let (result_tx, result_rx) = oneshot::channel();
1033
1034 self.control_tx
1035 .send(StatefulSupervisorCommand::WhichChildren {
1036 respond_to: result_tx,
1037 })
1038 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1039
1040 result_rx
1041 .await
1042 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1043 }
1044
1045 #[allow(clippy::unused_async)]
1051 pub async fn shutdown(&self) -> Result<(), StatefulSupervisorError> {
1052 self.control_tx
1053 .send(StatefulSupervisorCommand::Shutdown)
1054 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1055 Ok(())
1056 }
1057
1058 #[must_use]
1060 pub fn name(&self) -> &str {
1061 self.name.as_str()
1062 }
1063
1064 pub async fn restart_strategy(&self) -> Result<RestartStrategy, StatefulSupervisorError> {
1070 let (result_tx, result_rx) = oneshot::channel();
1071
1072 self.control_tx
1073 .send(StatefulSupervisorCommand::GetRestartStrategy {
1074 respond_to: result_tx,
1075 })
1076 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1077
1078 result_rx
1079 .await
1080 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))
1081 }
1082
1083 pub async fn uptime(&self) -> Result<u64, StatefulSupervisorError> {
1089 let (result_tx, result_rx) = oneshot::channel();
1090
1091 self.control_tx
1092 .send(StatefulSupervisorCommand::GetUptime {
1093 respond_to: result_tx,
1094 })
1095 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1096
1097 result_rx
1098 .await
1099 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))
1100 }
1101}