use anyhow::Result;
use rsactor::{message_handlers, Actor, ActorRef};
use std::thread;
use std::time::Duration;
use tokio::sync::mpsc; use tokio::task;
use tracing::{debug, info};
struct GetState;
struct SetFactor(f64);
struct ProcessedData {
value: f64,
timestamp: std::time::Instant,
}
enum TaskCommand {
ChangeInterval(Duration),
Stop,
}
struct SyncDataProcessorActor {
factor: f64,
latest_value: Option<f64>,
latest_timestamp: Option<std::time::Instant>,
task_sender: mpsc::Sender<TaskCommand>,
task_handle: task::JoinHandle<()>,
}
impl Actor for SyncDataProcessorActor {
type Args = ();
type Error = anyhow::Error;
async fn on_start(_args: Self::Args, actor_ref: &ActorRef<Self>) -> Result<Self, Self::Error> {
info!(
"SyncDataProcessorActor (id: {}) starting...",
actor_ref.identity()
);
let (task_tx, mut task_rx) = mpsc::channel::<TaskCommand>(32);
let task_actor_ref = actor_ref.clone();
let handle = task::spawn_blocking(move || {
info!("Synchronous background task started");
let mut interval = Duration::from_millis(500);
let mut running = true;
while running {
thread::sleep(interval);
let raw_value = rand::random::<f64>() * 100.0;
debug!("Sync task sending value {raw_value:.2} to actor");
if let Err(e) = task_actor_ref.blocking_tell(
ProcessedData {
value: raw_value,
timestamp: std::time::Instant::now(),
},
None,
) {
info!("Failed to send data to actor: {e}");
running = false;
}
match task_rx.try_recv() {
Ok(cmd) => match cmd {
TaskCommand::ChangeInterval(new_interval) => {
info!("Sync task changing interval to {new_interval:?}");
interval = new_interval;
}
TaskCommand::Stop => {
info!("Sync task received stop command");
running = false;
}
},
Err(mpsc::error::TryRecvError::Empty) => {
}
Err(mpsc::error::TryRecvError::Disconnected) => {
info!("Task command channel closed, stopping task");
running = false;
}
}
}
info!("Synchronous background task stopping");
});
let actor = Self {
factor: 1.0,
latest_value: None,
latest_timestamp: None,
task_sender: task_tx,
task_handle: handle,
};
info!("SyncDataProcessorActor started and sync background task spawned");
Ok(actor)
}
}
#[message_handlers]
impl SyncDataProcessorActor {
#[handler]
async fn handle_get_state(
&mut self,
_msg: GetState,
_: &ActorRef<Self>,
) -> (f64, Option<f64>, Option<std::time::Instant>) {
(self.factor, self.latest_value, self.latest_timestamp)
}
#[handler]
async fn handle_set_factor(&mut self, msg: SetFactor, _: &ActorRef<Self>) -> f64 {
let old_factor = self.factor;
self.factor = msg.0;
info!(
"Changed factor from {:.2} to {:.2}",
old_factor, self.factor
);
self.factor
}
#[handler]
async fn handle_processed_data(&mut self, msg: ProcessedData, _: &ActorRef<Self>) {
let processed_value = msg.value * self.factor;
self.latest_value = Some(processed_value);
self.latest_timestamp = Some(msg.timestamp);
debug!(
"Received data from sync task: original={:.2}, processed={:.2}, age={:?}",
msg.value,
processed_value,
msg.timestamp.elapsed()
);
}
#[handler]
async fn handle_task_command(&mut self, msg: TaskCommand, _: &ActorRef<Self>) -> bool {
match self.task_sender.send(msg).await {
Ok(_) => {
info!("Sent command to sync task");
true
}
Err(_) => {
info!("Failed to send command to sync task");
false
}
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.init();
info!("Starting actor-sync-task communication example");
let (actor_ref, join_handle) = rsactor::spawn::<SyncDataProcessorActor>(());
tokio::time::sleep(Duration::from_secs(2)).await;
let (factor, latest_value, timestamp): (f64, Option<f64>, Option<std::time::Instant>) =
actor_ref.ask(GetState).await?;
println!("Current state: factor={factor:.2}, latest_value={latest_value:?}");
if let Some(ts) = timestamp {
println!("Data age: {:?}", ts.elapsed());
}
println!("Changing processing factor to 2.5...");
let new_factor: f64 = actor_ref.ask(SetFactor(2.5)).await?;
println!("Factor changed to: {new_factor:.2}");
println!("Changing the sync task's data generation interval...");
let command_result = actor_ref
.ask(TaskCommand::ChangeInterval(Duration::from_millis(200)))
.await?;
if command_result {
println!("Successfully changed sync task interval");
} else {
println!("Failed to change sync task interval");
}
tokio::time::sleep(Duration::from_secs(3)).await;
let (factor, latest_value, timestamp): (f64, Option<f64>, Option<std::time::Instant>) =
actor_ref.ask(GetState).await?;
println!("Updated state: factor={factor:.2}, latest_value={latest_value:?}");
if let Some(ts) = timestamp {
println!("Data age: {:?}", ts.elapsed());
}
actor_ref.ask(TaskCommand::Stop).await?;
println!("Stopping actor...");
actor_ref.stop().await?;
let result = join_handle.await?;
match result {
rsactor::ActorResult::Completed { actor, killed } => {
println!("Actor completed successfully. Killed: {killed}");
println!(
"Final state: factor={:.2}, latest_value={:?}",
actor.factor, actor.latest_value
);
actor.task_handle.await.expect("Failed to join task handle");
}
rsactor::ActorResult::Failed {
actor,
error,
phase,
killed,
} => {
println!("Actor stop failed: {error}. Phase: {phase}, Killed: {killed}");
if let Some(actor) = actor {
println!(
"Final state: factor={:.2}, latest_value={:?}",
actor.factor, actor.latest_value
);
}
}
}
Ok(())
}