aper_stateroom/
lib.rs

1use aper::connection::{MessageToClient, MessageToServer, ServerConnection, ServerHandle};
2use chrono::serde::ts_milliseconds;
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5pub use state_program::{StateMachineContainerProgram, StateProgram};
6pub use stateroom::ClientId;
7use stateroom::{MessagePayload, StateroomContext, StateroomService};
8use std::collections::HashMap;
9
10mod state_program;
11
12pub struct AperStateroomService<P: StateProgram> {
13    connection: ServerConnection<P>,
14    suspended_event: Option<IntentEvent<P::T>>,
15    client_connections: HashMap<ClientId, ServerHandle<P>>,
16
17    /// Pseudo-connection for sending timer events.
18    timer_event_handle: ServerHandle<P>,
19}
20
21impl<P: StateProgram> Default for AperStateroomService<P> {
22    fn default() -> Self {
23        let mut connection = ServerConnection::new();
24        let timer_event_handle = connection.connect(|_| {});
25
26        AperStateroomService {
27            connection,
28            suspended_event: None,
29            client_connections: HashMap::new(),
30            timer_event_handle,
31        }
32    }
33}
34
35impl<P: StateProgram> AperStateroomService<P> {
36    fn update_suspended_event(&mut self, ctx: &impl StateroomContext) {
37        let susp = self.connection.state().suspended_event();
38        if susp == self.suspended_event {
39            return;
40        }
41
42        if let Some(ev) = &susp {
43            let dur = ev.timestamp.signed_duration_since(Utc::now());
44            ctx.set_timer(dur.num_milliseconds().max(0) as u32);
45        }
46
47        self.suspended_event = susp;
48    }
49
50    fn process_message(
51        &mut self,
52        message: MessageToServer,
53        client_id: Option<ClientId>,
54        ctx: &impl StateroomContext,
55    ) {
56        if let Some(handle) = client_id.and_then(|id| self.client_connections.get_mut(&id)) {
57            handle.receive(&message);
58        } else {
59            self.timer_event_handle.receive(&message);
60        }
61
62        self.update_suspended_event(ctx);
63    }
64}
65
66impl<P: StateProgram> StateroomService for AperStateroomService<P>
67where
68    P::T: Unpin + Send + Sync + 'static,
69{
70    fn init(&mut self, ctx: &impl StateroomContext) {
71        self.update_suspended_event(ctx);
72    }
73
74    fn connect(&mut self, client_id: ClientId, ctx: &impl StateroomContext) {
75        let ctx = Clone::clone(ctx);
76        let callback = move |message: &MessageToClient| {
77            ctx.send_message(client_id, bincode::serialize(&message).unwrap());
78        };
79
80        let handle = self.connection.connect(callback);
81
82        self.client_connections.insert(client_id, handle);
83    }
84
85    fn disconnect(&mut self, user: ClientId, _ctx: &impl StateroomContext) {
86        self.client_connections.remove(&user);
87    }
88
89    fn message(
90        &mut self,
91        client_id: ClientId,
92        message: MessagePayload,
93        ctx: &impl StateroomContext,
94    ) {
95        match message {
96            MessagePayload::Text(txt) => {
97                let message: MessageToServer = serde_json::from_str(&txt).unwrap();
98                self.process_message(message, Some(client_id), ctx);
99            }
100            MessagePayload::Bytes(bytes) => {
101                let message: MessageToServer = bincode::deserialize(&bytes).unwrap();
102                self.process_message(message, Some(client_id), ctx);
103            }
104        }
105    }
106
107    fn timer(&mut self, ctx: &impl StateroomContext) {
108        if let Some(mut event) = self.suspended_event.take() {
109            event.timestamp = Utc::now();
110            let event = bincode::serialize(&event).unwrap();
111            self.process_message(
112                MessageToServer::Intent {
113                    intent: event,
114                    client_version: 0,
115                },
116                None,
117                ctx,
118            );
119        }
120    }
121}
122
123pub type Timestamp = DateTime<Utc>;
124
125#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
126pub struct IntentEvent<T>
127where
128    T: Unpin + Send + Sync + 'static + Clone,
129{
130    #[serde(with = "ts_milliseconds")]
131    pub timestamp: Timestamp,
132    pub client: Option<u32>,
133    pub intent: T,
134}
135
136impl<T> IntentEvent<T>
137where
138    T: Unpin + Send + Sync + 'static + Clone,
139{
140    pub fn new(client: Option<u32>, timestamp: Timestamp, intent: T) -> IntentEvent<T> {
141        IntentEvent {
142            timestamp,
143            client,
144            intent,
145        }
146    }
147}