use crate::actor::{Actor, ActorError};
use crate::context::Context;
use crate::event::{Event, EventTrait, IntoEvent};
use crate::integration::error::Error as IntegrationError;
use crate::machine::Machine;
use crate::state::State;
use crate::MachineBuilder;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::PoisonError;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
pub enum CounterEvent {
#[default]
Increment,
Decrement,
Print,
}
impl EventTrait for CounterEvent {
fn event_type(&self) -> &str {
match self {
CounterEvent::Increment => "INCREMENT",
CounterEvent::Decrement => "DECREMENT",
CounterEvent::Print => "PRINT",
}
}
fn payload(&self) -> Option<&serde_json::Value> {
None }
fn name(&self) -> &str {
self.event_type() }
}
impl IntoEvent for CounterEvent {
fn into_event(self) -> Event {
Event::new(self.event_type())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct CounterState {
pub count: i32,
}
#[derive(Debug, Clone)]
pub struct CounterActor {
state: Arc<Mutex<CounterState>>,
_machine: Arc<Mutex<Machine<Context, CounterEvent, String, ()>>>,
}
impl CounterActor {
pub async fn new() -> Self {
let initial_state = CounterState::default();
let machine_result = MachineBuilder::<Context, CounterEvent, String, ()>::new(
"counter_machine".to_string(),
"Idle".to_string(),
)
.state(State::new("Idle".to_string())) .context(Context::new())
.build()
.await;
let machine = match machine_result {
Ok(m) => m,
Err(e) => {
panic!("Failed to build dummy machine: {}", e);
}
};
Self {
state: Arc::new(Mutex::new(initial_state)),
_machine: Arc::new(Mutex::new(machine)),
}
}
pub fn get_state(&self) -> Result<CounterState, IntegrationError> {
let state = self
.state
.lock()
.map_err(|_e: PoisonError<_>| IntegrationError::LockError)?;
Ok(state.clone())
}
pub fn increment(&self) -> Result<(), IntegrationError> {
let mut state = self
.state
.lock()
.map_err(|_e: PoisonError<_>| IntegrationError::LockError)?;
state.count += 1;
Ok(())
}
pub fn decrement(&self) -> Result<(), IntegrationError> {
let mut state = self
.state
.lock()
.map_err(|_e: PoisonError<_>| IntegrationError::LockError)?;
state.count -= 1;
Ok(())
}
}
#[async_trait]
impl Actor for CounterActor {
type Context = Context;
type StateId = String;
type Event = CounterEvent;
type Output = ();
type State = CounterState;
fn initial_state(&self) -> Self::State {
CounterState::default()
}
async fn receive(
&self,
state: Self::State, event: Self::Event,
) -> Result<Self::State, ActorError> {
log::debug!(
"CounterActor executing receive for event: {:?} with state: {:?}",
event,
state
);
let mut new_state = state.clone();
match event {
CounterEvent::Increment => {
new_state.count += 1;
log::info!("Counter incremented: {}", new_state.count);
}
CounterEvent::Decrement => {
new_state.count -= 1;
log::info!("Counter decremented: {}", new_state.count);
}
CounterEvent::Print => {
log::info!("Current count (in receive): {}", new_state.count);
}
}
Ok(new_state)
}
}