1pub use iced_core as core;
2pub use semver::Version;
3
4pub mod client;
5pub mod span;
6
7mod error;
8mod stream;
9
10pub use client::Client;
11pub use span::Span;
12
13use crate::core::theme;
14use crate::core::time::{Duration, SystemTime};
15use crate::error::Error;
16use crate::span::present;
17
18use futures::{SinkExt, Stream};
19use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
20use tokio::net;
21use tokio::sync::mpsc;
22use tokio::task;
23
24#[derive(Debug, Clone)]
25pub struct Connection {
26 commands: mpsc::Sender<client::Command>,
27}
28
29impl Connection {
30 pub fn rewind_to<'a>(
31 &self,
32 message: usize,
33 ) -> impl Future<Output = ()> + 'a {
34 let commands = self.commands.clone();
35
36 async move {
37 let _ = commands.send(client::Command::RewindTo { message }).await;
38 }
39 }
40
41 pub fn go_live<'a>(&self) -> impl Future<Output = ()> + 'a {
42 let commands = self.commands.clone();
43
44 async move {
45 let _ = commands.send(client::Command::GoLive).await;
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
51pub enum Event {
52 Connected {
53 connection: Connection,
54 at: SystemTime,
55 name: String,
56 version: Version,
57 theme: Option<theme::Palette>,
58 can_time_travel: bool,
59 },
60 Disconnected {
61 at: SystemTime,
62 },
63 ThemeChanged {
64 at: SystemTime,
65 palette: theme::Palette,
66 },
67 SpanFinished {
68 at: SystemTime,
69 duration: Duration,
70 span: Span,
71 },
72 QuitRequested {
73 at: SystemTime,
74 },
75 AlreadyRunning {
76 at: SystemTime,
77 },
78}
79
80impl Event {
81 pub fn at(&self) -> SystemTime {
82 match self {
83 Self::Connected { at, .. }
84 | Self::Disconnected { at, .. }
85 | Self::ThemeChanged { at, .. }
86 | Self::SpanFinished { at, .. }
87 | Self::QuitRequested { at }
88 | Self::AlreadyRunning { at } => *at,
89 }
90 }
91}
92
93pub fn is_running() -> bool {
94 std::net::TcpListener::bind(client::server_address_from_env()).is_err()
95}
96
97pub fn run() -> impl Stream<Item = Event> {
98 stream::channel(|mut output| async move {
99 let mut buffer = Vec::new();
100
101 let server = loop {
102 match net::TcpListener::bind(client::server_address_from_env())
103 .await
104 {
105 Ok(server) => break server,
106 Err(error) => {
107 if error.kind() == io::ErrorKind::AddrInUse {
108 let _ = output
109 .send(Event::AlreadyRunning {
110 at: SystemTime::now(),
111 })
112 .await;
113 }
114 delay().await;
115 }
116 };
117 };
118
119 loop {
120 let Ok((stream, _)) = server.accept().await else {
121 continue;
122 };
123
124 let (mut reader, mut writer) = {
125 let _ = stream.set_nodelay(true);
126 stream.into_split()
127 };
128
129 let (command_sender, mut command_receiver) = mpsc::channel(1);
130 let mut last_message = String::new();
131 let mut last_update_number = 0;
132 let mut last_tasks = 0;
133 let mut last_subscriptions = 0;
134 let mut last_present_layers = 0;
135 let mut last_prepare = present::Stage::default();
136 let mut last_render = present::Stage::default();
137
138 drop(task::spawn(async move {
139 let mut last_message_number = None;
140
141 while let Some(command) = command_receiver.recv().await {
142 match command {
143 client::Command::RewindTo { message } => {
144 if Some(message) == last_message_number {
145 continue;
146 }
147
148 last_message_number = Some(message);
149 }
150 client::Command::GoLive => {
151 last_message_number = None;
152 }
153 }
154
155 let _ =
156 send(&mut writer, command).await.inspect_err(|error| {
157 log::error!("Error when sending command: {error}")
158 });
159 }
160 }));
161
162 loop {
163 match receive(&mut reader, &mut buffer).await {
164 Ok(message) => {
165 match message {
166 client::Message::Connected {
167 at,
168 name,
169 version,
170 theme,
171 can_time_travel,
172 } => {
173 let _ = output
174 .send(Event::Connected {
175 connection: Connection {
176 commands: command_sender.clone(),
177 },
178 at,
179 name,
180 version,
181 theme,
182 can_time_travel,
183 })
184 .await;
185 }
186 client::Message::EventLogged { at, event } => {
187 match event {
188 client::Event::ThemeChanged(palette) => {
189 let _ = output
190 .send(Event::ThemeChanged {
191 at,
192 palette,
193 })
194 .await;
195 }
196 client::Event::SubscriptionsTracked(
197 amount_alive,
198 ) => {
199 last_subscriptions = amount_alive;
200 }
201 client::Event::MessageLogged {
202 number,
203 message,
204 } => {
205 last_update_number = number;
206 last_message = message;
207 }
208 client::Event::CommandsSpawned(
209 commands,
210 ) => {
211 last_tasks = commands;
212 }
213 client::Event::LayersRendered(layers) => {
214 last_present_layers = layers;
215 }
216 client::Event::SpanStarted(
217 span::Stage::Update,
218 ) => {
219 last_message.clear();
220 last_tasks = 0;
221 }
222 client::Event::SpanStarted(_) => {}
223 client::Event::SpanFinished(
224 stage,
225 duration,
226 ) => {
227 let span = match stage {
228 span::Stage::Boot => Span::Boot,
229 span::Stage::Update => {
230 Span::Update {
231 number: last_update_number,
232 message: last_message
233 .clone(),
234 tasks: last_tasks,
235 subscriptions:
236 last_subscriptions,
237 }
238 }
239 span::Stage::View(window) => {
240 Span::View { window }
241 }
242 span::Stage::Layout(window) => {
243 Span::Layout { window }
244 }
245 span::Stage::Interact(window) => {
246 Span::Interact { window }
247 }
248 span::Stage::Draw(window) => {
249 Span::Draw { window }
250 }
251 span::Stage::Prepare(primitive)
252 | span::Stage::Render(primitive) => {
253 let stage = if matches!(
254 stage,
255 span::Stage::Prepare(_),
256 ) {
257 &mut last_prepare
258 } else {
259 &mut last_render
260 };
261
262 let primitive = match primitive {
263 present::Primitive::Quad => &mut stage.quads,
264 present::Primitive::Triangle => &mut stage.triangles,
265 present::Primitive::Shader => &mut stage.shaders,
266 present::Primitive::Text => &mut stage.text,
267 present::Primitive::Image => &mut stage.images,
268 };
269
270 *primitive += duration;
271
272 continue;
273 }
274 span::Stage::Present(window) => {
275 let span = Span::Present {
276 window,
277 prepare: last_prepare,
278 render: last_render,
279 layers: last_present_layers,
280 };
281
282 last_prepare =
283 present::Stage::default();
284 last_render =
285 present::Stage::default();
286 last_present_layers = 0;
287
288 span
289 }
290 span::Stage::Custom(name) => {
291 Span::Custom { name }
292 }
293 };
294
295 let _ = output
296 .send(Event::SpanFinished {
297 at,
298 duration,
299 span,
300 })
301 .await;
302 }
303 }
304 }
305 client::Message::Quit { at } => {
306 let _ = output
307 .send(Event::QuitRequested { at })
308 .await;
309 }
310 };
311 }
312 Err(Error::IOFailed(_)) => {
313 let _ = output
314 .send(Event::Disconnected {
315 at: SystemTime::now(),
316 })
317 .await;
318 break;
319 }
320 Err(Error::DecodingFailed(error)) => {
321 log::warn!("Error decoding beacon output: {error}")
322 }
323 }
324 }
325 }
326 })
327}
328
329async fn receive(
330 stream: &mut net::tcp::OwnedReadHalf,
331 buffer: &mut Vec<u8>,
332) -> Result<client::Message, Error> {
333 let size = stream.read_u64().await? as usize;
334
335 if buffer.len() < size {
336 buffer.resize(size, 0);
337 }
338
339 let _n = stream.read_exact(&mut buffer[..size]).await?;
340
341 Ok(bincode::deserialize(buffer)?)
342}
343
344async fn send(
345 stream: &mut net::tcp::OwnedWriteHalf,
346 command: client::Command,
347) -> Result<(), io::Error> {
348 let bytes = bincode::serialize(&command).expect("Encode input message");
349 let size = bytes.len() as u64;
350
351 stream.write_all(&size.to_be_bytes()).await?;
352 stream.write_all(&bytes).await?;
353 stream.flush().await?;
354
355 Ok(())
356}
357
358async fn delay() {
359 tokio::time::sleep(Duration::from_secs(2)).await;
360}