iced_beacon/
lib.rs

1pub use iced_core as core;
2pub use semver::Version;
3
4pub mod client;
5pub mod span;
6
7mod error;
8mod stream;
9
10pub use client::Client;
11pub use span::Span;
12
13use crate::core::theme;
14use crate::core::time::{Duration, SystemTime};
15use crate::error::Error;
16use crate::span::present;
17
18use futures::{SinkExt, Stream};
19use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
20use tokio::net;
21use tokio::sync::mpsc;
22use tokio::task;
23
24#[derive(Debug, Clone)]
25pub struct Connection {
26    commands: mpsc::Sender<client::Command>,
27}
28
29impl Connection {
30    pub fn rewind_to<'a>(
31        &self,
32        message: usize,
33    ) -> impl Future<Output = ()> + 'a {
34        let commands = self.commands.clone();
35
36        async move {
37            let _ = commands.send(client::Command::RewindTo { message }).await;
38        }
39    }
40
41    pub fn go_live<'a>(&self) -> impl Future<Output = ()> + 'a {
42        let commands = self.commands.clone();
43
44        async move {
45            let _ = commands.send(client::Command::GoLive).await;
46        }
47    }
48}
49
50#[derive(Debug, Clone)]
51pub enum Event {
52    Connected {
53        connection: Connection,
54        at: SystemTime,
55        name: String,
56        version: Version,
57        theme: Option<theme::Palette>,
58        can_time_travel: bool,
59    },
60    Disconnected {
61        at: SystemTime,
62    },
63    ThemeChanged {
64        at: SystemTime,
65        palette: theme::Palette,
66    },
67    SpanFinished {
68        at: SystemTime,
69        duration: Duration,
70        span: Span,
71    },
72    QuitRequested {
73        at: SystemTime,
74    },
75    AlreadyRunning {
76        at: SystemTime,
77    },
78}
79
80impl Event {
81    pub fn at(&self) -> SystemTime {
82        match self {
83            Self::Connected { at, .. }
84            | Self::Disconnected { at, .. }
85            | Self::ThemeChanged { at, .. }
86            | Self::SpanFinished { at, .. }
87            | Self::QuitRequested { at }
88            | Self::AlreadyRunning { at } => *at,
89        }
90    }
91}
92
93pub fn is_running() -> bool {
94    std::net::TcpListener::bind(client::server_address_from_env()).is_err()
95}
96
97pub fn run() -> impl Stream<Item = Event> {
98    stream::channel(|mut output| async move {
99        let mut buffer = Vec::new();
100
101        let server = loop {
102            match net::TcpListener::bind(client::server_address_from_env())
103                .await
104            {
105                Ok(server) => break server,
106                Err(error) => {
107                    if error.kind() == io::ErrorKind::AddrInUse {
108                        let _ = output
109                            .send(Event::AlreadyRunning {
110                                at: SystemTime::now(),
111                            })
112                            .await;
113                    }
114                    delay().await;
115                }
116            };
117        };
118
119        loop {
120            let Ok((stream, _)) = server.accept().await else {
121                continue;
122            };
123
124            let (mut reader, mut writer) = {
125                let _ = stream.set_nodelay(true);
126                stream.into_split()
127            };
128
129            let (command_sender, mut command_receiver) = mpsc::channel(1);
130            let mut last_message = String::new();
131            let mut last_update_number = 0;
132            let mut last_tasks = 0;
133            let mut last_subscriptions = 0;
134            let mut last_present_layers = 0;
135            let mut last_prepare = present::Stage::default();
136            let mut last_render = present::Stage::default();
137
138            drop(task::spawn(async move {
139                let mut last_message_number = None;
140
141                while let Some(command) = command_receiver.recv().await {
142                    match command {
143                        client::Command::RewindTo { message } => {
144                            if Some(message) == last_message_number {
145                                continue;
146                            }
147
148                            last_message_number = Some(message);
149                        }
150                        client::Command::GoLive => {
151                            last_message_number = None;
152                        }
153                    }
154
155                    let _ =
156                        send(&mut writer, command).await.inspect_err(|error| {
157                            log::error!("Error when sending command: {error}")
158                        });
159                }
160            }));
161
162            loop {
163                match receive(&mut reader, &mut buffer).await {
164                    Ok(message) => {
165                        match message {
166                            client::Message::Connected {
167                                at,
168                                name,
169                                version,
170                                theme,
171                                can_time_travel,
172                            } => {
173                                let _ = output
174                                    .send(Event::Connected {
175                                        connection: Connection {
176                                            commands: command_sender.clone(),
177                                        },
178                                        at,
179                                        name,
180                                        version,
181                                        theme,
182                                        can_time_travel,
183                                    })
184                                    .await;
185                            }
186                            client::Message::EventLogged { at, event } => {
187                                match event {
188                                    client::Event::ThemeChanged(palette) => {
189                                        let _ = output
190                                            .send(Event::ThemeChanged {
191                                                at,
192                                                palette,
193                                            })
194                                            .await;
195                                    }
196                                    client::Event::SubscriptionsTracked(
197                                        amount_alive,
198                                    ) => {
199                                        last_subscriptions = amount_alive;
200                                    }
201                                    client::Event::MessageLogged {
202                                        number,
203                                        message,
204                                    } => {
205                                        last_update_number = number;
206                                        last_message = message;
207                                    }
208                                    client::Event::CommandsSpawned(
209                                        commands,
210                                    ) => {
211                                        last_tasks = commands;
212                                    }
213                                    client::Event::LayersRendered(layers) => {
214                                        last_present_layers = layers;
215                                    }
216                                    client::Event::SpanStarted(
217                                        span::Stage::Update,
218                                    ) => {
219                                        last_message.clear();
220                                        last_tasks = 0;
221                                    }
222                                    client::Event::SpanStarted(_) => {}
223                                    client::Event::SpanFinished(
224                                        stage,
225                                        duration,
226                                    ) => {
227                                        let span = match stage {
228                                            span::Stage::Boot => Span::Boot,
229                                            span::Stage::Update => {
230                                                Span::Update {
231                                                    number: last_update_number,
232                                                    message: last_message
233                                                        .clone(),
234                                                    tasks: last_tasks,
235                                                    subscriptions:
236                                                        last_subscriptions,
237                                                }
238                                            }
239                                            span::Stage::View(window) => {
240                                                Span::View { window }
241                                            }
242                                            span::Stage::Layout(window) => {
243                                                Span::Layout { window }
244                                            }
245                                            span::Stage::Interact(window) => {
246                                                Span::Interact { window }
247                                            }
248                                            span::Stage::Draw(window) => {
249                                                Span::Draw { window }
250                                            }
251                                            span::Stage::Prepare(primitive)
252                                            | span::Stage::Render(primitive) => {
253                                                let stage = if matches!(
254                                                    stage,
255                                                    span::Stage::Prepare(_),
256                                                ) {
257                                                    &mut last_prepare
258                                                } else {
259                                                    &mut last_render
260                                                };
261
262                                                let primitive = match primitive {
263                                                    present::Primitive::Quad => &mut stage.quads,
264                                                    present::Primitive::Triangle => &mut stage.triangles,
265                                                    present::Primitive::Shader => &mut stage.shaders,
266                                                    present::Primitive::Text => &mut stage.text,
267                                                    present::Primitive::Image => &mut stage.images,
268                                                };
269
270                                                *primitive += duration;
271
272                                                continue;
273                                            }
274                                            span::Stage::Present(window) => {
275                                                let span = Span::Present {
276                                                    window,
277                                                    prepare: last_prepare,
278                                                    render: last_render,
279                                                    layers: last_present_layers,
280                                                };
281
282                                                last_prepare =
283                                                    present::Stage::default();
284                                                last_render =
285                                                    present::Stage::default();
286                                                last_present_layers = 0;
287
288                                                span
289                                            }
290                                            span::Stage::Custom(name) => {
291                                                Span::Custom { name }
292                                            }
293                                        };
294
295                                        let _ = output
296                                            .send(Event::SpanFinished {
297                                                at,
298                                                duration,
299                                                span,
300                                            })
301                                            .await;
302                                    }
303                                }
304                            }
305                            client::Message::Quit { at } => {
306                                let _ = output
307                                    .send(Event::QuitRequested { at })
308                                    .await;
309                            }
310                        };
311                    }
312                    Err(Error::IOFailed(_)) => {
313                        let _ = output
314                            .send(Event::Disconnected {
315                                at: SystemTime::now(),
316                            })
317                            .await;
318                        break;
319                    }
320                    Err(Error::DecodingFailed(error)) => {
321                        log::warn!("Error decoding beacon output: {error}")
322                    }
323                }
324            }
325        }
326    })
327}
328
329async fn receive(
330    stream: &mut net::tcp::OwnedReadHalf,
331    buffer: &mut Vec<u8>,
332) -> Result<client::Message, Error> {
333    let size = stream.read_u64().await? as usize;
334
335    if buffer.len() < size {
336        buffer.resize(size, 0);
337    }
338
339    let _n = stream.read_exact(&mut buffer[..size]).await?;
340
341    Ok(bincode::deserialize(buffer)?)
342}
343
344async fn send(
345    stream: &mut net::tcp::OwnedWriteHalf,
346    command: client::Command,
347) -> Result<(), io::Error> {
348    let bytes = bincode::serialize(&command).expect("Encode input message");
349    let size = bytes.len() as u64;
350
351    stream.write_all(&size.to_be_bytes()).await?;
352    stream.write_all(&bytes).await?;
353    stream.flush().await?;
354
355    Ok(())
356}
357
358async fn delay() {
359    tokio::time::sleep(Duration::from_secs(2)).await;
360}