acton_core/actor/managed_agent/
started.rs

1/*
2 * Copyright (c) 2024. Govcraft
3 *
4 * Licensed under either of
5 *   * Apache License, Version 2.0 (the "License");
6 *     you may not use this file except in compliance with the License.
7 *     You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8 *   * MIT license: http://opensource.org/licenses/MIT
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the applicable License for the specific language governing permissions and
14 * limitations under that License.
15 */
16
17use std::any::type_name_of_val;
18use std::fmt::Debug;
19use std::time::Duration;
20
21use futures::future::join_all;
22use tokio::time::sleep;
23use tracing::{instrument, trace};
24
25use crate::actor::ManagedAgent;
26use crate::common::{Envelope, OutboundEnvelope, ReactorItem, ReactorMap};
27use crate::message::{BrokerRequestEnvelope, MessageAddress, SystemSignal};
28use crate::traits::Actor;
29
30/// The `Started` state of the actor.
31pub struct Started;
32
33impl<Agent: Default + Send + Debug + 'static> ManagedAgent<Started, Agent> {
34    /// Creates a new outbound envelope for the actor.
35    ///
36    /// # Returns
37    /// An optional `OutboundEnvelope` if the context's outbox is available.
38    pub fn new_envelope(&self) -> Option<OutboundEnvelope> {
39        Option::from(OutboundEnvelope::new(MessageAddress::new(
40            self.handle.outbox.clone(),
41            self.id.clone(),
42        )))
43    }
44
45    /// Creates a new parent envelope for the actor.
46    ///
47    /// # Returns
48    /// A clone of the parent's return envelope.
49    pub fn new_parent_envelope(&self) -> Option<OutboundEnvelope> {
50        self.parent.as_ref().map(|parent| parent.create_envelope(None).clone())
51    }
52
53    #[instrument(skip(reactors, self))]
54    pub(crate) async fn wake(&mut self, reactors: ReactorMap<Agent>) {
55        (self.after_start)(self).await;
56        let mut terminate_requested = false;
57        while let Some(incoming_envelope) = self.inbox.recv().await {
58            let type_id;
59            let mut envelope;
60            trace!("envelope sender is {}", incoming_envelope.reply_to.sender.root);
61            trace!("{}", type_name_of_val(&incoming_envelope.message));
62            // Special case for BrokerRequestEnvelope
63            if let Some(broker_request_envelope) = incoming_envelope
64                .message
65                .as_any()
66                .downcast_ref::<BrokerRequestEnvelope>()
67            {
68                envelope = Envelope::new(
69                    broker_request_envelope.message.clone(),
70                    incoming_envelope.reply_to.clone(),
71                    incoming_envelope.recipient.clone(),
72                );
73                type_id = broker_request_envelope.message.as_any().type_id();
74            } else {
75                envelope = incoming_envelope;
76                type_id = envelope.message.as_any().type_id();
77            }
78
79            if let Some(reactor) = reactors.get(&type_id) {
80                match reactor.value() {
81                    ReactorItem::FutureReactor(fut) => fut(self, &mut envelope).await,
82                }
83            } else if let Some(SystemSignal::Terminate) =
84                envelope.message.as_any().downcast_ref::<SystemSignal>()
85            {
86                // Set the termination flag
87                terminate_requested = true;
88                trace!("Termination signal received, waiting for remaining messages...");
89                (self.before_stop)(self).await;
90                //give the before_stop a chance to process the termination signal
91                sleep(Duration::from_millis(10)).await;
92                self.inbox.close();
93            }
94            if terminate_requested && self.inbox.is_empty() && self.inbox.is_closed() {
95                self.inbox.close();
96                self.terminate().await;
97                break;
98            }
99        }
100
101        (self.after_stop)(self).await;
102    }
103    #[instrument(skip(self))]
104    async fn terminate(&mut self) {
105
106        // Collect suspend futures for all children
107        let suspend_futures: Vec<_> = self.handle.children().iter().map(|item| {
108            let child_ref = item.value().clone(); // Clone to take ownership
109            async move {
110                let _ = child_ref.stop().await;
111            }
112        }).collect();
113
114        // Wait for all children to suspend concurrently
115        join_all(suspend_futures).await;
116
117        trace!(
118        actor = self.id.to_string(),
119        "All subordinates terminated. Closing mailbox for"
120    );
121
122        self.inbox.close();
123    }
124}