use anyhow::Result;
use rsactor::{message_handlers, Actor, ActorRef, ActorWeak};
use std::time::Duration;
use tracing::{debug, info};
struct GetState;
struct SetFactor(f64);
struct ProcessedData {
value: f64,
timestamp: std::time::Instant,
}
enum TaskCommand {
ChangeInterval(Duration),
}
struct SendTaskCommand(TaskCommand);
struct DataProcessorActor {
factor: f64,
latest_value: Option<f64>,
latest_timestamp: Option<std::time::Instant>,
interval: tokio::time::Interval,
running: bool,
}
impl Actor for DataProcessorActor {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
info!(
"DataProcessorActor (id: {}) starting...",
actor_ref.identity()
);
let mut actor = Self {
factor: 1.0,
latest_value: None,
latest_timestamp: None,
interval: tokio::time::interval(Duration::from_millis(500)),
running: true,
};
actor.interval = tokio::time::interval(Duration::from_millis(500));
actor.running = true;
info!("DataProcessorActor started with event-based processing");
Ok(actor)
}
async fn on_run(&mut self, _actor_ref: &ActorWeak<Self>) -> Result<bool, Self::Error> {
self.interval.tick().await;
let raw_value = rand::random::<f64>() * 100.0;
let processed_value = raw_value * self.factor;
self.latest_value = Some(processed_value);
self.latest_timestamp = Some(std::time::Instant::now());
debug!("Generated data: original={raw_value:.2}, processed={processed_value:.2}");
Ok(true) }
}
#[message_handlers]
impl DataProcessorActor {
#[handler]
async fn handle_get_state(
&mut self,
_msg: GetState,
_: &ActorRef<Self>,
) -> (f64, Option<f64>, Option<std::time::Instant>) {
(self.factor, self.latest_value, self.latest_timestamp)
}
#[handler]
async fn handle_set_factor(&mut self, msg: SetFactor, _: &ActorRef<Self>) -> f64 {
let old_factor = self.factor;
self.factor = msg.0;
info!(
"Changed factor from {:.2} to {:.2}",
old_factor, self.factor
);
self.factor
}
#[handler]
async fn handle_processed_data(&mut self, msg: ProcessedData, _: &ActorRef<Self>) {
let processed_value = msg.value * self.factor;
self.latest_value = Some(processed_value);
self.latest_timestamp = Some(msg.timestamp);
debug!(
"Received data from task: original={:.2}, processed={:.2}, age={:?}",
msg.value,
processed_value,
msg.timestamp.elapsed()
);
}
#[handler]
async fn handle_send_task_command(
&mut self,
msg: SendTaskCommand,
_actor_ref: &ActorRef<Self>,
) -> bool {
match msg.0 {
TaskCommand::ChangeInterval(new_interval) => {
self.interval = tokio::time::interval(new_interval);
info!("Successfully changed interval");
true
}
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.init();
info!("Starting actor-task communication example");
let (actor_ref, join_handle) = rsactor::spawn::<DataProcessorActor>(());
tokio::time::sleep(Duration::from_secs(2)).await;
let (factor, latest_value, timestamp): (f64, Option<f64>, Option<std::time::Instant>) =
actor_ref.ask(GetState).await?;
println!("Current state: factor={factor:.2}, latest_value={latest_value:?}");
if let Some(ts) = timestamp {
println!("Data age: {:?}", ts.elapsed());
}
println!("Changing processing factor to 2.5...");
let new_factor: f64 = actor_ref.ask(SetFactor(2.5)).await?;
println!("Factor changed to: {new_factor:.2}");
println!("Changing the task's data generation interval...");
let command_result = actor_ref
.ask(SendTaskCommand(TaskCommand::ChangeInterval(
Duration::from_millis(200),
)))
.await?;
if command_result {
println!("Successfully changed task interval");
} else {
println!("Failed to change task interval");
}
tokio::time::sleep(Duration::from_secs(3)).await;
let (factor, latest_value, timestamp): (f64, Option<f64>, Option<std::time::Instant>) =
actor_ref.ask(GetState).await?;
println!("Updated state: factor={factor:.2}, latest_value={latest_value:?}");
if let Some(ts) = timestamp {
println!("Data age: {:?}", ts.elapsed());
}
println!("Stopping actor...");
actor_ref.stop().await?;
let result = join_handle.await?;
match result {
rsactor::ActorResult::Completed { actor, killed } => {
println!("Actor completed successfully. Killed: {killed}");
println!(
"Final state: factor={:.2}, latest_value={:?}",
actor.factor, actor.latest_value
);
}
rsactor::ActorResult::Failed {
actor,
error,
killed,
phase,
} => {
println!("Actor stop failed: {error}. Phase: {phase}, Killed: {killed}");
if let Some(actor) = actor {
println!(
"Final state: factor={:.2}, latest_value={:?}",
actor.factor, actor.latest_value
);
}
}
}
Ok(())
}