1use crate::restart::{RestartIntensity, RestartPolicy, RestartStrategy, RestartTracker};
72use crate::supervisor_common::{WorkerTermination, run_worker};
73use crate::types::{ChildExitReason, ChildId, ChildInfo, ChildType, WorkerContext};
74use crate::worker::Worker;
75use std::fmt;
76use std::sync::Arc;
77use tokio::sync::{mpsc, oneshot};
78use tokio::task::JoinHandle;
79
80pub(crate) struct StatefulWorkerSpec<W: Worker> {
86 pub id: ChildId,
87 pub worker_factory: Arc<dyn Fn(Arc<WorkerContext>) -> W + Send + Sync>,
88 pub restart_policy: RestartPolicy,
89 pub context: Arc<WorkerContext>,
90}
91
92impl<W: Worker> Clone for StatefulWorkerSpec<W> {
93 fn clone(&self) -> Self {
94 Self {
95 id: self.id.clone(),
96 worker_factory: Arc::clone(&self.worker_factory),
97 restart_policy: self.restart_policy,
98 context: Arc::clone(&self.context),
99 }
100 }
101}
102
103impl<W: Worker> StatefulWorkerSpec<W> {
104 pub(crate) fn new(
105 id: impl Into<String>,
106 factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
107 restart_policy: RestartPolicy,
108 context: Arc<WorkerContext>,
109 ) -> Self {
110 Self {
111 id: id.into(),
112 worker_factory: Arc::new(factory),
113 restart_policy,
114 context,
115 }
116 }
117
118 pub(crate) fn create_worker(&self) -> W {
119 (self.worker_factory)(Arc::clone(&self.context))
120 }
121}
122
123impl<W: Worker> fmt::Debug for StatefulWorkerSpec<W> {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("StatefulWorkerSpec")
126 .field("id", &self.id)
127 .field("restart_policy", &self.restart_policy)
128 .finish_non_exhaustive()
129 }
130}
131
132pub(crate) struct StatefulWorkerProcess<W: Worker> {
138 pub spec: StatefulWorkerSpec<W>,
139 pub handle: Option<JoinHandle<()>>,
140}
141
142impl<W: Worker> StatefulWorkerProcess<W> {
143 pub(crate) fn spawn<Cmd>(
144 spec: StatefulWorkerSpec<W>,
145 supervisor_name: String,
146 control_tx: mpsc::UnboundedSender<Cmd>,
147 ) -> Self
148 where
149 Cmd: From<WorkerTermination> + Send + 'static,
150 {
151 let worker = spec.create_worker();
152 let worker_id = spec.id.clone();
153 let handle = tokio::spawn(async move {
154 run_worker(supervisor_name, worker_id, worker, control_tx, None).await;
155 });
156
157 Self {
158 spec,
159 handle: Some(handle),
160 }
161 }
162
163 pub(crate) fn spawn_with_link<Cmd>(
165 spec: StatefulWorkerSpec<W>,
166 supervisor_name: String,
167 control_tx: mpsc::UnboundedSender<Cmd>,
168 init_tx: tokio::sync::oneshot::Sender<Result<(), String>>,
169 ) -> Self
170 where
171 Cmd: From<WorkerTermination> + Send + 'static,
172 {
173 let worker = spec.create_worker();
174 let worker_id = spec.id.clone();
175 let handle = tokio::spawn(async move {
176 run_worker(
177 supervisor_name,
178 worker_id,
179 worker,
180 control_tx,
181 Some(init_tx),
182 )
183 .await;
184 });
185
186 Self {
187 spec,
188 handle: Some(handle),
189 }
190 }
191
192 pub(crate) async fn stop(&mut self) {
193 if let Some(handle) = self.handle.take() {
194 handle.abort();
195 drop(handle.await);
197 }
198 }
199}
200
201impl<W: Worker> Drop for StatefulWorkerProcess<W> {
202 fn drop(&mut self) {
203 if let Some(handle) = self.handle.take() {
204 handle.abort();
205 }
206 }
207}
208
209pub(crate) enum StatefulChildSpec<W: Worker> {
215 Worker(StatefulWorkerSpec<W>),
216 Supervisor(Arc<StatefulSupervisorSpec<W>>),
217}
218
219impl<W: Worker> Clone for StatefulChildSpec<W> {
220 fn clone(&self) -> Self {
221 match self {
222 StatefulChildSpec::Worker(w) => StatefulChildSpec::Worker(w.clone()),
223 StatefulChildSpec::Supervisor(s) => StatefulChildSpec::Supervisor(Arc::clone(s)),
224 }
225 }
226}
227
228pub struct StatefulSupervisorSpec<W: Worker> {
230 pub(crate) name: String,
231 pub(crate) children: Vec<StatefulChildSpec<W>>,
232 pub(crate) restart_strategy: RestartStrategy,
233 pub(crate) restart_intensity: RestartIntensity,
234 pub(crate) context: Arc<WorkerContext>,
235}
236
237impl<W: Worker> Clone for StatefulSupervisorSpec<W> {
238 fn clone(&self) -> Self {
239 Self {
240 name: self.name.clone(),
241 children: self.children.clone(),
242 restart_strategy: self.restart_strategy,
243 restart_intensity: self.restart_intensity,
244 context: Arc::clone(&self.context),
245 }
246 }
247}
248
249impl<W: Worker> StatefulSupervisorSpec<W> {
250 pub fn new(name: impl Into<String>) -> Self {
253 Self {
254 name: name.into(),
255 children: Vec::new(),
256 restart_strategy: RestartStrategy::default(),
257 restart_intensity: RestartIntensity::default(),
258 context: Arc::new(WorkerContext::new()),
259 }
260 }
261
262 pub fn with_restart_strategy(mut self, strategy: RestartStrategy) -> Self {
264 self.restart_strategy = strategy;
265 self
266 }
267
268 pub fn with_restart_intensity(mut self, intensity: RestartIntensity) -> Self {
270 self.restart_intensity = intensity;
271 self
272 }
273
274 pub fn with_worker(
277 mut self,
278 id: impl Into<String>,
279 factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
280 restart_policy: RestartPolicy,
281 ) -> Self {
282 self.children
283 .push(StatefulChildSpec::Worker(StatefulWorkerSpec::new(
284 id,
285 factory,
286 restart_policy,
287 Arc::clone(&self.context),
288 )));
289 self
290 }
291
292 pub fn with_supervisor(mut self, supervisor: StatefulSupervisorSpec<W>) -> Self {
294 self.children
295 .push(StatefulChildSpec::Supervisor(Arc::new(supervisor)));
296 self
297 }
298
299 pub fn context(&self) -> &Arc<WorkerContext> {
301 &self.context
302 }
303}
304
305pub(crate) enum StatefulChild<W: Worker> {
311 Worker(StatefulWorkerProcess<W>),
312 Supervisor {
313 handle: StatefulSupervisorHandle<W>,
314 spec: Arc<StatefulSupervisorSpec<W>>,
315 },
316}
317
318impl<W: Worker> StatefulChild<W> {
319 #[inline]
320 pub fn id(&self) -> &str {
321 match self {
322 StatefulChild::Worker(w) => &w.spec.id,
323 StatefulChild::Supervisor { spec, .. } => &spec.name,
324 }
325 }
326
327 #[inline]
328 pub fn child_type(&self) -> ChildType {
329 match self {
330 StatefulChild::Worker(_) => ChildType::Worker,
331 StatefulChild::Supervisor { .. } => ChildType::Supervisor,
332 }
333 }
334
335 #[inline]
336 pub fn restart_policy(&self) -> Option<RestartPolicy> {
337 match self {
338 StatefulChild::Worker(w) => Some(w.spec.restart_policy),
339 StatefulChild::Supervisor { .. } => Some(RestartPolicy::Permanent),
340 }
341 }
342
343 pub async fn shutdown(&mut self) {
344 match self {
345 StatefulChild::Worker(w) => w.stop().await,
346 StatefulChild::Supervisor { handle, .. } => {
347 let _ = handle.shutdown().await;
348 }
349 }
350 }
351}
352
353pub(crate) enum StatefulRestartInfo<W: Worker> {
355 Worker(StatefulWorkerSpec<W>),
356 Supervisor(Arc<StatefulSupervisorSpec<W>>),
357}
358
359#[derive(Debug)]
365pub enum StatefulSupervisorError {
366 NoChildren(String),
368 AllChildrenFailed(String),
370 ShuttingDown(String),
372 ChildAlreadyExists(String),
374 ChildNotFound(String),
376 InitializationFailed {
378 child_id: String,
380 reason: String,
382 },
383 InitializationTimeout {
385 child_id: String,
387 timeout: std::time::Duration,
389 },
390}
391
392impl fmt::Display for StatefulSupervisorError {
393 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394 match self {
395 StatefulSupervisorError::NoChildren(name) => {
396 write!(f, "stateful supervisor '{}' has no children", name)
397 }
398 StatefulSupervisorError::AllChildrenFailed(name) => {
399 write!(
400 f,
401 "all children failed for stateful supervisor '{}' - restart intensity limit exceeded",
402 name
403 )
404 }
405 StatefulSupervisorError::ShuttingDown(name) => {
406 write!(
407 f,
408 "stateful supervisor '{}' is shutting down - operation not permitted",
409 name
410 )
411 }
412 StatefulSupervisorError::ChildAlreadyExists(id) => {
413 write!(
414 f,
415 "child with id '{}' already exists - use a unique identifier",
416 id
417 )
418 }
419 StatefulSupervisorError::ChildNotFound(id) => {
420 write!(
421 f,
422 "child with id '{}' not found - it may have already terminated",
423 id
424 )
425 }
426 StatefulSupervisorError::InitializationFailed { child_id, reason } => {
427 write!(f, "child '{child_id}' initialization failed: {reason}")
428 }
429 StatefulSupervisorError::InitializationTimeout { child_id, timeout } => {
430 write!(
431 f,
432 "child '{}' initialization timed out after {:?}",
433 child_id, timeout
434 )
435 }
436 }
437 }
438}
439
440impl std::error::Error for StatefulSupervisorError {}
441
442pub(crate) enum StatefulSupervisorCommand<W: Worker> {
448 StartChild {
449 spec: StatefulWorkerSpec<W>,
450 respond_to: oneshot::Sender<Result<ChildId, StatefulSupervisorError>>,
451 },
452 StartChildLinked {
453 spec: StatefulWorkerSpec<W>,
454 timeout: std::time::Duration,
455 respond_to: oneshot::Sender<Result<ChildId, StatefulSupervisorError>>,
456 },
457 TerminateChild {
458 id: ChildId,
459 respond_to: oneshot::Sender<Result<(), StatefulSupervisorError>>,
460 },
461 WhichChildren {
462 respond_to: oneshot::Sender<Result<Vec<ChildInfo>, StatefulSupervisorError>>,
463 },
464 GetRestartStrategy {
465 respond_to: oneshot::Sender<RestartStrategy>,
466 },
467 GetUptime {
468 respond_to: oneshot::Sender<u64>,
469 },
470 ChildTerminated {
471 id: ChildId,
472 reason: ChildExitReason,
473 },
474 Shutdown,
475}
476
477impl<W: Worker> From<WorkerTermination> for StatefulSupervisorCommand<W> {
478 fn from(term: WorkerTermination) -> Self {
479 StatefulSupervisorCommand::ChildTerminated {
480 id: term.id,
481 reason: term.reason,
482 }
483 }
484}
485
486pub(crate) struct StatefulSupervisorRuntime<W: Worker> {
488 name: String,
489 children: Vec<StatefulChild<W>>,
490 control_rx: mpsc::UnboundedReceiver<StatefulSupervisorCommand<W>>,
491 control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
492 restart_strategy: RestartStrategy,
493 restart_tracker: RestartTracker,
494 created_at: std::time::Instant,
495}
496
497impl<W: Worker> StatefulSupervisorRuntime<W> {
498 pub(crate) fn new(
499 spec: StatefulSupervisorSpec<W>,
500 control_rx: mpsc::UnboundedReceiver<StatefulSupervisorCommand<W>>,
501 control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
502 ) -> Self {
503 let mut children = Vec::with_capacity(spec.children.len());
504
505 for child_spec in spec.children {
506 match child_spec {
507 StatefulChildSpec::Worker(worker_spec) => {
508 let worker = StatefulWorkerProcess::spawn(
509 worker_spec,
510 spec.name.clone(),
511 control_tx.clone(),
512 );
513 children.push(StatefulChild::Worker(worker));
514 }
515 StatefulChildSpec::Supervisor(supervisor_spec) => {
516 let supervisor = StatefulSupervisorHandle::start((*supervisor_spec).clone());
517 children.push(StatefulChild::Supervisor {
518 handle: supervisor,
519 spec: Arc::clone(&supervisor_spec),
520 });
521 }
522 }
523 }
524
525 Self {
526 name: spec.name,
527 children,
528 control_rx,
529 control_tx,
530 restart_strategy: spec.restart_strategy,
531 restart_tracker: RestartTracker::new(spec.restart_intensity),
532 created_at: std::time::Instant::now(),
533 }
534 }
535
536 pub(crate) async fn run(mut self) {
537 while let Some(command) = self.control_rx.recv().await {
538 match command {
539 StatefulSupervisorCommand::StartChild { spec, respond_to } => {
540 let result = self.handle_start_child(spec).await;
541 let _ = respond_to.send(result);
542 }
543 StatefulSupervisorCommand::StartChildLinked {
544 spec,
545 timeout,
546 respond_to,
547 } => {
548 let result = self.handle_start_child_linked(spec, timeout).await;
549 let _ = respond_to.send(result);
550 }
551 StatefulSupervisorCommand::TerminateChild { id, respond_to } => {
552 let result = self.handle_terminate_child(&id).await;
553 let _ = respond_to.send(result);
554 }
555 StatefulSupervisorCommand::WhichChildren { respond_to } => {
556 let result = self.handle_which_children();
557 let _ = respond_to.send(result);
558 }
559 StatefulSupervisorCommand::GetRestartStrategy { respond_to } => {
560 let _ = respond_to.send(self.restart_strategy);
561 }
562 StatefulSupervisorCommand::GetUptime { respond_to } => {
563 let uptime = self.created_at.elapsed().as_secs();
564 let _ = respond_to.send(uptime);
565 }
566 StatefulSupervisorCommand::ChildTerminated { id, reason } => {
567 self.handle_child_terminated(id, reason).await;
568 }
569 StatefulSupervisorCommand::Shutdown => {
570 self.shutdown_children().await;
571 return;
572 }
573 }
574 }
575
576 self.shutdown_children().await;
577 }
578
579 async fn handle_start_child(
580 &mut self,
581 spec: StatefulWorkerSpec<W>,
582 ) -> Result<ChildId, StatefulSupervisorError> {
583 if self.children.iter().any(|c| c.id() == spec.id) {
585 return Err(StatefulSupervisorError::ChildAlreadyExists(spec.id.clone()));
586 }
587
588 let id = spec.id.clone();
589 let worker = StatefulWorkerProcess::spawn(spec, self.name.clone(), self.control_tx.clone());
590
591 self.children.push(StatefulChild::Worker(worker));
592 tracing::debug!(
593 supervisor = %self.name,
594 child = %id,
595 "dynamically started child"
596 );
597
598 Ok(id)
599 }
600
601 async fn handle_start_child_linked(
602 &mut self,
603 spec: StatefulWorkerSpec<W>,
604 timeout: std::time::Duration,
605 ) -> Result<ChildId, StatefulSupervisorError> {
606 if self.children.iter().any(|c| c.id() == spec.id) {
608 return Err(StatefulSupervisorError::ChildAlreadyExists(spec.id.clone()));
609 }
610
611 let id = spec.id.clone();
612 let (init_tx, init_rx) = oneshot::channel();
613
614 let worker = StatefulWorkerProcess::spawn_with_link(
615 spec,
616 self.name.clone(),
617 self.control_tx.clone(),
618 init_tx,
619 );
620
621 let init_result = tokio::time::timeout(timeout, init_rx).await;
623
624 match init_result {
625 Ok(Ok(Ok(()))) => {
626 self.children.push(StatefulChild::Worker(worker));
628 tracing::debug!(
629 supervisor = %self.name,
630 child = %id,
631 "linked child started successfully"
632 );
633 Ok(id)
634 }
635 Ok(Ok(Err(reason))) => {
636 tracing::error!(
638 supervisor = %self.name,
639 child = %id,
640 reason = %reason,
641 "linked child initialization failed"
642 );
643 Err(StatefulSupervisorError::InitializationFailed {
645 child_id: id,
646 reason,
647 })
648 }
649 Ok(Err(_)) => {
650 tracing::error!(
652 supervisor = %self.name,
653 child = %id,
654 "linked child panicked during initialization"
655 );
656 Err(StatefulSupervisorError::InitializationFailed {
657 child_id: id,
658 reason: "worker panicked during initialization".to_owned(),
659 })
660 }
661 Err(_) => {
662 tracing::error!(
664 supervisor = %self.name,
665 child = %id,
666 timeout_secs = ?timeout.as_secs(),
667 "linked child initialization timed out"
668 );
669 Err(StatefulSupervisorError::InitializationTimeout {
670 child_id: id,
671 timeout,
672 })
673 }
674 }
675 }
676
677 async fn handle_terminate_child(&mut self, id: &str) -> Result<(), StatefulSupervisorError> {
678 let position = self
679 .children
680 .iter()
681 .position(|c| c.id() == id)
682 .ok_or_else(|| StatefulSupervisorError::ChildNotFound(id.to_owned()))?;
683
684 let mut child = self.children.remove(position);
685 child.shutdown().await;
686
687 tracing::debug!(
688 supervisor = %self.name,
689 child = %id,
690 "terminated child"
691 );
692 Ok(())
693 }
694
695 fn handle_which_children(&self) -> Result<Vec<ChildInfo>, StatefulSupervisorError> {
696 let info = self
697 .children
698 .iter()
699 .map(|child| ChildInfo {
700 id: child.id().to_owned(),
701 child_type: child.child_type(),
702 restart_policy: child.restart_policy(),
703 })
704 .collect();
705
706 Ok(info)
707 }
708
709 async fn handle_child_terminated(&mut self, id: ChildId, reason: ChildExitReason) {
710 tracing::debug!(
711 supervisor = %self.name,
712 child = %id,
713 reason = ?reason,
714 "child terminated"
715 );
716
717 let position = match self.children.iter().position(|c| c.id() == id) {
718 Some(pos) => pos,
719 None => {
720 tracing::warn!(
721 supervisor = %self.name,
722 child = %id,
723 "terminated child not found in list"
724 );
725 return;
726 }
727 };
728
729 let should_restart = match &self.children[position] {
731 StatefulChild::Worker(w) => match w.spec.restart_policy {
732 RestartPolicy::Permanent => true,
733 RestartPolicy::Temporary => false,
734 RestartPolicy::Transient => reason == ChildExitReason::Abnormal,
735 },
736 StatefulChild::Supervisor { .. } => true, };
738
739 if !should_restart {
740 tracing::debug!(
741 supervisor = %self.name,
742 child = %id,
743 policy = ?self.children[position].restart_policy(),
744 reason = ?reason,
745 "not restarting child"
746 );
747 self.children.remove(position);
748 return;
749 }
750
751 if self.restart_tracker.record_restart() {
753 tracing::error!(
754 supervisor = %self.name,
755 "restart intensity exceeded, shutting down"
756 );
757 self.shutdown_children().await;
758 return;
759 }
760
761 match self.restart_strategy {
763 RestartStrategy::OneForOne => {
764 self.restart_child(position).await;
765 }
766 RestartStrategy::OneForAll => {
767 self.restart_all_children().await;
768 }
769 RestartStrategy::RestForOne => {
770 self.restart_from(position).await;
771 }
772 }
773 }
774
775 async fn restart_child(&mut self, position: usize) {
776 let restart_info = match &self.children[position] {
778 StatefulChild::Worker(worker) => StatefulRestartInfo::Worker(worker.spec.clone()),
779 StatefulChild::Supervisor { spec, .. } => {
780 StatefulRestartInfo::Supervisor(Arc::clone(spec))
781 }
782 };
783
784 self.children[position].shutdown().await;
786
787 match restart_info {
789 StatefulRestartInfo::Worker(spec) => {
790 tracing::debug!(
791 supervisor = %self.name,
792 worker = %spec.id,
793 "restarting worker"
794 );
795 let new_worker = StatefulWorkerProcess::spawn(
796 spec.clone(),
797 self.name.clone(),
798 self.control_tx.clone(),
799 );
800 self.children[position] = StatefulChild::Worker(new_worker);
801 tracing::debug!(
802 supervisor = %self.name,
803 worker = %spec.id,
804 "worker restarted"
805 );
806 }
807 StatefulRestartInfo::Supervisor(spec) => {
808 let name = spec.name.clone();
809 tracing::debug!(
810 supervisor = %self.name,
811 child_supervisor = %name,
812 "restarting supervisor"
813 );
814 let new_handle = StatefulSupervisorHandle::start((*spec).clone());
815 self.children[position] = StatefulChild::Supervisor {
816 handle: new_handle,
817 spec,
818 };
819 tracing::debug!(
820 supervisor = %self.name,
821 child_supervisor = %name,
822 "supervisor restarted"
823 );
824 }
825 }
826 }
827
828 async fn restart_all_children(&mut self) {
829 tracing::debug!(
830 supervisor = %self.name,
831 "restarting all children (one_for_all)"
832 );
833
834 for child in &mut self.children {
836 child.shutdown().await;
837 }
838
839 for child in &mut self.children {
841 if let StatefulChild::Worker(worker) = child {
842 let spec = worker.spec.clone();
843 let new_worker = StatefulWorkerProcess::spawn(
844 spec.clone(),
845 self.name.clone(),
846 self.control_tx.clone(),
847 );
848 *child = StatefulChild::Worker(new_worker);
849 tracing::debug!(
850 supervisor = %self.name,
851 child = %spec.id,
852 "child restarted"
853 );
854 }
855 }
856 }
857
858 async fn restart_from(&mut self, position: usize) {
859 tracing::debug!(
860 supervisor = %self.name,
861 position = %position,
862 "restarting from position (rest_for_one)"
863 );
864
865 for i in position..self.children.len() {
866 self.children[i].shutdown().await;
867
868 if let StatefulChild::Worker(worker) = &self.children[i] {
869 let spec = worker.spec.clone();
870 let new_worker = StatefulWorkerProcess::spawn(
871 spec.clone(),
872 self.name.clone(),
873 self.control_tx.clone(),
874 );
875 self.children[i] = StatefulChild::Worker(new_worker);
876 tracing::debug!(
877 supervisor = %self.name,
878 child = %spec.id,
879 "child restarted"
880 );
881 }
882 }
883 }
884
885 async fn shutdown_children(&mut self) {
886 for child in self.children.drain(..) {
887 let id = child.id().to_owned();
888 let mut child = child;
889 child.shutdown().await;
890 tracing::debug!(
891 supervisor = %self.name,
892 child = %id,
893 "shut down child"
894 );
895 }
896 }
897}
898
899#[derive(Clone)]
905pub struct StatefulSupervisorHandle<W: Worker> {
906 pub(crate) name: Arc<String>,
907 pub(crate) control_tx: mpsc::UnboundedSender<StatefulSupervisorCommand<W>>,
908}
909
910impl<W: Worker> StatefulSupervisorHandle<W> {
911 pub fn start(spec: StatefulSupervisorSpec<W>) -> Self {
913 let (control_tx, control_rx) = mpsc::unbounded_channel();
914 let name_arc = Arc::new(spec.name.clone());
915 let runtime = StatefulSupervisorRuntime::new(spec, control_rx, control_tx.clone());
916
917 let runtime_name = Arc::clone(&name_arc);
918 tokio::spawn(async move {
919 runtime.run().await;
920 tracing::debug!(name = %*runtime_name, "supervisor stopped");
921 });
922
923 Self {
924 name: name_arc,
925 control_tx,
926 }
927 }
928
929 pub async fn start_child(
931 &self,
932 id: impl Into<String>,
933 factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
934 restart_policy: RestartPolicy,
935 context: Arc<WorkerContext>,
936 ) -> Result<ChildId, StatefulSupervisorError> {
937 let (result_tx, result_rx) = oneshot::channel();
938 let spec = StatefulWorkerSpec::new(id, factory, restart_policy, context);
939
940 self.control_tx
941 .send(StatefulSupervisorCommand::StartChild {
942 spec,
943 respond_to: result_tx,
944 })
945 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
946
947 result_rx
948 .await
949 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
950 }
951
952 pub async fn start_child_linked(
977 &self,
978 id: impl Into<String>,
979 factory: impl Fn(Arc<WorkerContext>) -> W + Send + Sync + 'static,
980 restart_policy: RestartPolicy,
981 context: Arc<WorkerContext>,
982 timeout: std::time::Duration,
983 ) -> Result<ChildId, StatefulSupervisorError> {
984 let (result_tx, result_rx) = oneshot::channel();
985 let spec = StatefulWorkerSpec::new(id, factory, restart_policy, context);
986
987 self.control_tx
988 .send(StatefulSupervisorCommand::StartChildLinked {
989 spec,
990 timeout,
991 respond_to: result_tx,
992 })
993 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
994
995 result_rx
996 .await
997 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
998 }
999
1000 pub async fn terminate_child(&self, id: &str) -> Result<(), StatefulSupervisorError> {
1002 let (result_tx, result_rx) = oneshot::channel();
1003
1004 self.control_tx
1005 .send(StatefulSupervisorCommand::TerminateChild {
1006 id: id.to_owned(),
1007 respond_to: result_tx,
1008 })
1009 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1010
1011 result_rx
1012 .await
1013 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1014 }
1015
1016 pub async fn which_children(&self) -> Result<Vec<ChildInfo>, StatefulSupervisorError> {
1018 let (result_tx, result_rx) = oneshot::channel();
1019
1020 self.control_tx
1021 .send(StatefulSupervisorCommand::WhichChildren {
1022 respond_to: result_tx,
1023 })
1024 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1025
1026 result_rx
1027 .await
1028 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?
1029 }
1030
1031 pub async fn shutdown(&self) -> Result<(), StatefulSupervisorError> {
1033 self.control_tx
1034 .send(StatefulSupervisorCommand::Shutdown)
1035 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1036 Ok(())
1037 }
1038
1039 pub fn name(&self) -> &str {
1041 self.name.as_str()
1042 }
1043
1044 pub async fn restart_strategy(&self) -> Result<RestartStrategy, StatefulSupervisorError> {
1046 let (result_tx, result_rx) = oneshot::channel();
1047
1048 self.control_tx
1049 .send(StatefulSupervisorCommand::GetRestartStrategy {
1050 respond_to: result_tx,
1051 })
1052 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1053
1054 result_rx
1055 .await
1056 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))
1057 }
1058
1059 pub async fn uptime(&self) -> Result<u64, StatefulSupervisorError> {
1061 let (result_tx, result_rx) = oneshot::channel();
1062
1063 self.control_tx
1064 .send(StatefulSupervisorCommand::GetUptime {
1065 respond_to: result_tx,
1066 })
1067 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))?;
1068
1069 result_rx
1070 .await
1071 .map_err(|_| StatefulSupervisorError::ShuttingDown(self.name().to_owned()))
1072 }
1073}