1use aper::connection::{MessageToClient, MessageToServer, ServerConnection, ServerHandle};
2use chrono::serde::ts_milliseconds;
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5pub use state_program::{StateMachineContainerProgram, StateProgram};
6pub use stateroom::ClientId;
7use stateroom::{MessagePayload, StateroomContext, StateroomService};
8use std::collections::HashMap;
9
10mod state_program;
11
12pub struct AperStateroomService<P: StateProgram> {
13 connection: ServerConnection<P>,
14 suspended_event: Option<IntentEvent<P::T>>,
15 client_connections: HashMap<ClientId, ServerHandle<P>>,
16
17 timer_event_handle: ServerHandle<P>,
19}
20
21impl<P: StateProgram> Default for AperStateroomService<P> {
22 fn default() -> Self {
23 let mut connection = ServerConnection::new();
24 let timer_event_handle = connection.connect(|_| {});
25
26 AperStateroomService {
27 connection,
28 suspended_event: None,
29 client_connections: HashMap::new(),
30 timer_event_handle,
31 }
32 }
33}
34
35impl<P: StateProgram> AperStateroomService<P> {
36 fn update_suspended_event(&mut self, ctx: &impl StateroomContext) {
37 let susp = self.connection.state().suspended_event();
38 if susp == self.suspended_event {
39 return;
40 }
41
42 if let Some(ev) = &susp {
43 let dur = ev.timestamp.signed_duration_since(Utc::now());
44 ctx.set_timer(dur.num_milliseconds().max(0) as u32);
45 }
46
47 self.suspended_event = susp;
48 }
49
50 fn process_message(
51 &mut self,
52 message: MessageToServer,
53 client_id: Option<ClientId>,
54 ctx: &impl StateroomContext,
55 ) {
56 if let Some(handle) = client_id.and_then(|id| self.client_connections.get_mut(&id)) {
57 handle.receive(&message);
58 } else {
59 self.timer_event_handle.receive(&message);
60 }
61
62 self.update_suspended_event(ctx);
63 }
64}
65
66impl<P: StateProgram> StateroomService for AperStateroomService<P>
67where
68 P::T: Unpin + Send + Sync + 'static,
69{
70 fn init(&mut self, ctx: &impl StateroomContext) {
71 self.update_suspended_event(ctx);
72 }
73
74 fn connect(&mut self, client_id: ClientId, ctx: &impl StateroomContext) {
75 let ctx = Clone::clone(ctx);
76 let callback = move |message: &MessageToClient| {
77 ctx.send_message(client_id, bincode::serialize(&message).unwrap());
78 };
79
80 let handle = self.connection.connect(callback);
81
82 self.client_connections.insert(client_id, handle);
83 }
84
85 fn disconnect(&mut self, user: ClientId, _ctx: &impl StateroomContext) {
86 self.client_connections.remove(&user);
87 }
88
89 fn message(
90 &mut self,
91 client_id: ClientId,
92 message: MessagePayload,
93 ctx: &impl StateroomContext,
94 ) {
95 match message {
96 MessagePayload::Text(txt) => {
97 let message: MessageToServer = serde_json::from_str(&txt).unwrap();
98 self.process_message(message, Some(client_id), ctx);
99 }
100 MessagePayload::Bytes(bytes) => {
101 let message: MessageToServer = bincode::deserialize(&bytes).unwrap();
102 self.process_message(message, Some(client_id), ctx);
103 }
104 }
105 }
106
107 fn timer(&mut self, ctx: &impl StateroomContext) {
108 if let Some(mut event) = self.suspended_event.take() {
109 event.timestamp = Utc::now();
110 let event = bincode::serialize(&event).unwrap();
111 self.process_message(
112 MessageToServer::Intent {
113 intent: event,
114 client_version: 0,
115 },
116 None,
117 ctx,
118 );
119 }
120 }
121}
122
123pub type Timestamp = DateTime<Utc>;
124
125#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
126pub struct IntentEvent<T>
127where
128 T: Unpin + Send + Sync + 'static + Clone,
129{
130 #[serde(with = "ts_milliseconds")]
131 pub timestamp: Timestamp,
132 pub client: Option<u32>,
133 pub intent: T,
134}
135
136impl<T> IntentEvent<T>
137where
138 T: Unpin + Send + Sync + 'static + Clone,
139{
140 pub fn new(client: Option<u32>, timestamp: Timestamp, intent: T) -> IntentEvent<T> {
141 IntentEvent {
142 timestamp,
143 client,
144 intent,
145 }
146 }
147}