puffin_http/server.rs
1use anyhow::Context as _;
2use puffin::{FrameSinkId, GlobalProfiler, ScopeCollection};
3use std::{
4 io::Write as _,
5 net::{SocketAddr, TcpListener, TcpStream},
6 sync::{
7 Arc,
8 atomic::{AtomicUsize, Ordering},
9 },
10};
11
12/// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough.
13const MAX_FRAMES_IN_QUEUE: usize = 30;
14
15/// Listens for incoming connections
16/// and streams them puffin profiler data.
17///
18/// Drop to stop transmitting and listening for new connections.
19#[must_use = "When Server is dropped, the server is closed, so keep it around!"]
20pub struct Server {
21 sink_id: FrameSinkId,
22 join_handle: Option<std::thread::JoinHandle<()>>,
23 num_clients: Arc<AtomicUsize>,
24 sink_remove: fn(FrameSinkId) -> (),
25}
26
27impl Server {
28 /// Start listening for connections on this addr (e.g. "0.0.0.0:8585")
29 ///
30 /// Connects to the [`GlobalProfiler`]
31 ///
32 /// # Errors
33 ///
34 /// forward error from [`Self::new_custom`] call.
35 pub fn new(bind_addr: &str) -> anyhow::Result<Self> {
36 fn global_add(sink: puffin::FrameSink) -> FrameSinkId {
37 GlobalProfiler::lock().add_sink(sink)
38 }
39 fn global_remove(id: FrameSinkId) {
40 GlobalProfiler::lock().remove_sink(id);
41 }
42
43 Self::new_custom(bind_addr, global_add, global_remove)
44 }
45
46 /// Starts a new puffin server, with a custom function for installing the server's sink
47 ///
48 /// # Arguments
49 /// * `bind_addr` - The address to bind to, when listening for connections
50 /// (e.g. "localhost:8585" or "127.0.0.1:8585")
51 /// * `sink_install` - A function that installs the [Server]'s sink into
52 /// a [`GlobalProfiler`], and then returns the [`FrameSinkId`] so that the sink can be removed later
53 /// * `sink_remove` - A function that reverts `sink_install`.
54 /// This should be a call to remove the sink from the profiler ([`GlobalProfiler::remove_sink`])
55 ///
56 /// # Example
57 ///
58 /// Using this is slightly complicated, but it is possible to use this to set a custom profiler per-thread,
59 /// such that threads can be grouped together and profiled separately. E.g. you could have one profiling server
60 /// instance for the main UI loop, and another for the background worker loop, and events/frames from those thread(s)
61 /// would be completely separated. You can then hook up two separate instances of `puffin_viewer` and profile them separately.
62 ///
63 /// # Errors
64 ///
65 /// Will return an `io::Error` if the [`TcpListener::bind`] fail.
66 /// Will return an `io::Error` if the spawn of the thread ,for connection and data send management, fail.
67 ///
68 /// ## Per-Thread Profiling
69 /// ```
70 /// # use puffin::GlobalProfiler;
71 /// # use puffin::{StreamInfoRef, ThreadInfo, ScopeDetails};
72 /// # use puffin_http::Server;
73 /// # use puffin::ThreadProfiler;
74 /// #
75 /// # pub fn main() {
76 /// #
77 /// #
78 /// // Initialise the profiling server for the main app
79 /// let default_server = Server::new("localhost:8585").expect("failed to create default profiling server");
80 /// puffin::profile_scope!("main_scope");
81 ///
82 /// // Create a new [GlobalProfiler] instance. This is where we will be sending the events to for our threads.
83 /// // [OnceLock] and [Mutex] are there so that we can safely get exclusive mutable access.
84 /// static CUSTOM_PROFILER: std::sync::OnceLock<std::sync::Mutex<GlobalProfiler>> = std::sync::OnceLock::new();
85 /// // Helper function to access the profiler
86 /// fn get_custom_profiler() -> std::sync::MutexGuard<'static, GlobalProfiler> {
87 /// CUSTOM_PROFILER.get_or_init(|| std::sync::Mutex::new(GlobalProfiler::default()))
88 /// .lock().expect("failed to lock custom profiler")
89 /// }
90 /// // Create the custom profiling server that uses our custom profiler instead of the global/default one
91 /// let thread_server = Server::new_custom(
92 /// "localhost:6969",
93 /// // Adds the [Server]'s sink to our custom profiler
94 /// |sink| get_custom_profiler().add_sink(sink),
95 /// // Remove
96 /// |id| _ = get_custom_profiler().remove_sink(id)
97 /// );
98 ///
99 /// // Create some custom threads where we use the custom profiler and server
100 /// std::thread::scope(|scope| {
101 /// scope.spawn(move ||{
102 /// // Tell this thread to use the custom profiler
103 /// let _ = ThreadProfiler::initialize(
104 /// // Use the same time source as default puffin
105 /// puffin::now_ns,
106 /// // However redirect the events to our `custom_profiler`, instead of the default
107 /// // which would be the one returned by [GlobalProfiler::lock()]
108 /// |info: ThreadInfo, details: &[ScopeDetails], stream: &StreamInfoRef<'_>|
109 /// get_custom_profiler().report(info, details, stream)
110 /// );
111 ///
112 /// // Do work
113 /// {
114 /// puffin::profile_scope!("inside_thread");
115 /// println!("hello from the thread");
116 /// std::thread::sleep(std::time::Duration::from_secs(1));
117 /// }
118 ///
119 /// // Tell our profiler that we are done with this frame
120 /// // This will be sent to the server on port 6969
121 /// get_custom_profiler().new_frame();
122 /// });
123 /// });
124 ///
125 /// // New frame for the global profiler. This is completely separate from the scopes with the custom profiler
126 /// GlobalProfiler::lock().new_frame();
127 /// #
128 /// #
129 /// # }
130 /// ```
131 ///
132 /// ## Helpful Macro
133 /// ```rust
134 /// # use std::thread::sleep;
135 /// # use std::time::Duration;
136 ///
137 /// /// This macro makes it much easier to define profilers
138 /// ///
139 /// /// This macro makes use of the `pastey` crate to generate unique identifiers, and `tracing` to log events
140 /// macro_rules! profiler {
141 /// ($(
142 /// {name: $name:ident, port: $port:expr $(,install: |$install_var:ident| $install:block, drop: |$drop_var:ident| $drop:block)? $(,)?}
143 /// ),* $(,)?)
144 /// => {
145 /// $(
146 /// profiler!(@inner { name: $name, port: $port $(,install: |$install_var| $install, drop: |$drop_var| $drop)? });
147 /// )*
148 /// };
149 ///
150 /// (@inner { name: $name:ident, port: $port:expr }) => {
151 /// pastey::paste!{
152 /// #[doc = concat!("The address to bind the ", std::stringify!([< $name:lower >]), " thread profilers' server to")]
153 /// pub const [< $name:upper _PROFILER_ADDR >] : &'static str
154 /// = concat!("127.0.0.1:", $port);
155 ///
156 /// /// Installs the server's sink into the custom profiler
157 /// #[doc(hidden)]
158 /// fn [< $name:lower _profiler_server_install >](sink: puffin::FrameSink) -> puffin::FrameSinkId {
159 /// [< $name:lower _profiler_lock >]().add_sink(sink)
160 /// }
161 ///
162 /// /// Drops the server's sink and removes from profiler
163 /// #[doc(hidden)]
164 /// fn [< $name:lower _profiler_server_drop >](id: puffin::FrameSinkId){
165 /// [< $name:lower _profiler_lock >]().remove_sink(id);
166 /// }
167 ///
168 /// #[doc = concat!("The instance of the ", std::stringify!([< $name:lower >]), " thread profiler's server")]
169 /// pub static [< $name:upper _PROFILER_SERVER >] : std::sync::LazyLock<std::sync::Mutex<puffin_http::Server>>
170 /// = std::sync::LazyLock::new(|| {
171 /// eprintln!(
172 /// "starting puffin_http server for {} profiler at {}",
173 /// std::stringify!([<$name:lower>]),
174 /// [< $name:upper _PROFILER_ADDR >]
175 /// );
176 /// std::sync::Mutex::new(
177 /// puffin_http::Server::new_custom(
178 /// [< $name:upper _PROFILER_ADDR >],
179 /// // Can't use closures in a const context, use fn-pointers instead
180 /// [< $name:lower _profiler_server_install >],
181 /// [< $name:lower _profiler_server_drop >],
182 /// )
183 /// .expect(&format!("{} puffin_http server failed to start", std::stringify!([<$name:lower>])))
184 /// )
185 /// });
186 ///
187 /// #[doc = concat!("A custom reporter for the ", std::stringify!([< $name:lower >]), " thread reporter")]
188 /// pub fn [< $name:lower _profiler_reporter >] (info: puffin::ThreadInfo, details: &[puffin::ScopeDetails], stream: &puffin::StreamInfoRef<'_>) {
189 /// [< $name:lower _profiler_lock >]().report(info, details, stream)
190 /// }
191 ///
192 /// #[doc = concat!("Accessor for the ", std::stringify!([< $name:lower >]), " thread reporter")]
193 /// pub fn [< $name:lower _profiler_lock >]() -> std::sync::MutexGuard<'static, puffin::GlobalProfiler> {
194 /// static [< $name _PROFILER >] : std::sync::LazyLock<std::sync::Mutex<puffin::GlobalProfiler>> = std::sync::LazyLock::new(Default::default);
195 /// [< $name _PROFILER >].lock().expect("poisoned std::sync::mutex")
196 /// }
197 ///
198 /// #[doc = concat!("Initialises the ", std::stringify!([< $name:lower >]), " thread reporter and server.\
199 /// Call this on each different thread you want to register with this profiler")]
200 /// pub fn [< $name:lower _profiler_init >]() {
201 /// eprintln!("init thread profiler \"{}\"", std::stringify!([<$name:lower>]));
202 /// std::mem::drop([< $name:upper _PROFILER_SERVER >].lock());
203 /// eprintln!("set thread custom profiler \"{}\"", std::stringify!([<$name:lower>]));
204 /// puffin::ThreadProfiler::initialize(::puffin::now_ns, [< $name:lower _profiler_reporter >]);
205 /// }
206 /// }
207 /// };
208 /// }
209 ///
210 /// profiler! {
211 /// { name: UI, port: "2a" },
212 /// { name: RENDERER, port: 8586 },
213 /// { name: BACKGROUND, port: 8587 },
214 /// }
215 ///
216 /// pub fn demo() {
217 /// std::thread::spawn(|| {
218 /// // Initialise the custom profiler for this thread
219 /// // Now all puffin events are sent to the custom profiling server instead
220 /// //
221 /// background_profiler_init();
222 ///
223 /// for i in 0..100{
224 /// puffin::profile_scope!("test");
225 /// sleep(Duration::from_millis(i));
226 /// }
227 ///
228 /// // Mark a new frame so the data is flushed to the server
229 /// background_profiler_lock().new_frame();
230 /// });
231 /// }
232 /// ```
233 pub fn new_custom(
234 bind_addr: &str,
235 sink_install: fn(puffin::FrameSink) -> FrameSinkId,
236 sink_remove: fn(FrameSinkId) -> (),
237 ) -> anyhow::Result<Self> {
238 let tcp_listener = TcpListener::bind(bind_addr).context("binding server TCP socket")?;
239 tcp_listener
240 .set_nonblocking(true)
241 .context("TCP set_nonblocking")?;
242
243 // We use crossbeam_channel instead of `mpsc`,
244 // because on shutdown we want all frames to be sent.
245 // `mpsc::Receiver` stops receiving as soon as the `Sender` is dropped,
246 // but `crossbeam_channel` will continue until the channel is empty.
247 let (tx, rx): (crossbeam_channel::Sender<Arc<puffin::FrameData>>, _) =
248 crossbeam_channel::unbounded();
249
250 let num_clients = Arc::new(AtomicUsize::default());
251 let num_clients_cloned = Arc::clone(&num_clients);
252
253 let join_handle = std::thread::Builder::new()
254 .name("puffin-server".to_owned())
255 .spawn(move || {
256 let mut server_impl = PuffinServerImpl {
257 tcp_listener,
258 clients: Default::default(),
259 num_clients: num_clients_cloned,
260 send_all_scopes: false,
261 scope_collection: Default::default(),
262 };
263
264 while let Ok(frame) = rx.recv() {
265 if let Err(err) = server_impl.accept_new_clients() {
266 log::warn!("puffin server failure: {err}");
267 }
268
269 if let Err(err) = server_impl.send(&frame) {
270 log::warn!("puffin server failure: {err}");
271 }
272 }
273 })
274 .context("Couldn't spawn thread")?;
275
276 // Call the `install` function to add ourselves as a sink
277 let sink_id = sink_install(Box::new(move |frame| {
278 tx.send(frame).ok();
279 }));
280
281 Ok(Self {
282 sink_id,
283 join_handle: Some(join_handle),
284 num_clients,
285 sink_remove,
286 })
287 }
288
289 /// Number of clients currently connected.
290 pub fn num_clients(&self) -> usize {
291 self.num_clients.load(Ordering::SeqCst)
292 }
293}
294
295impl Drop for Server {
296 fn drop(&mut self) {
297 // Remove ourselves from the profiler
298 (self.sink_remove)(self.sink_id);
299
300 // Take care to send everything before we shut down:
301 if let Some(join_handle) = self.join_handle.take() {
302 join_handle.join().ok();
303 }
304 }
305}
306
307type Packet = Arc<[u8]>;
308
309struct Client {
310 client_addr: SocketAddr,
311 packet_tx: Option<crossbeam_channel::Sender<Packet>>,
312 join_handle: Option<std::thread::JoinHandle<()>>,
313}
314
315impl Drop for Client {
316 fn drop(&mut self) {
317 // Take care to send everything before we shut down!
318
319 // Drop the sender to signal to shut down:
320 self.packet_tx = None;
321
322 // Wait for the shutdown:
323 if let Some(join_handle) = self.join_handle.take() {
324 join_handle.join().ok();
325 }
326 }
327}
328
329/// Listens for incoming connections
330/// and streams them puffin profiler data.
331struct PuffinServerImpl {
332 tcp_listener: TcpListener,
333 clients: Vec<Client>,
334 num_clients: Arc<AtomicUsize>,
335 send_all_scopes: bool,
336 scope_collection: ScopeCollection,
337}
338
339impl PuffinServerImpl {
340 fn accept_new_clients(&mut self) -> anyhow::Result<()> {
341 loop {
342 match self.tcp_listener.accept() {
343 Ok((tcp_stream, client_addr)) => {
344 tcp_stream
345 .set_nonblocking(false)
346 .context("stream.set_nonblocking")?;
347
348 log::info!("{client_addr} connected");
349
350 let (packet_tx, packet_rx) = crossbeam_channel::bounded(MAX_FRAMES_IN_QUEUE);
351
352 let join_handle = std::thread::Builder::new()
353 .name("puffin-server-client".to_owned())
354 .spawn(move || client_loop(packet_rx, client_addr, tcp_stream))
355 .context("Couldn't spawn thread")?;
356
357 // Send all scopes when new client connects.
358 self.send_all_scopes = true;
359 self.clients.push(Client {
360 client_addr,
361 packet_tx: Some(packet_tx),
362 join_handle: Some(join_handle),
363 });
364 self.num_clients.store(self.clients.len(), Ordering::SeqCst);
365 }
366 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
367 break; // Nothing to do for now.
368 }
369 Err(err) => {
370 anyhow::bail!("puffin server TCP error: {err}");
371 }
372 }
373 }
374 Ok(())
375 }
376
377 pub fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> {
378 puffin::profile_function!();
379
380 // Keep scope_collection up-to-date
381 for new_scope in &frame.scope_delta {
382 self.scope_collection.insert(Arc::clone(new_scope));
383 }
384
385 // Nothing to send if no clients => Early return.
386 if self.clients.is_empty() {
387 return Ok(());
388 }
389
390 let mut packet = vec![];
391
392 packet
393 .write_all(&crate::PROTOCOL_VERSION.to_le_bytes())
394 .context("Encode puffin `PROTOCOL_VERSION` in packet to be send to client.")?;
395
396 let scope_collection = if self.send_all_scopes {
397 Some(&self.scope_collection)
398 } else {
399 None
400 };
401
402 frame
403 .write_into(scope_collection, &mut packet)
404 .context("Encode puffin frame")?;
405 self.send_all_scopes = false;
406
407 let packet: Packet = packet.into();
408
409 self.clients.retain(|client| match &client.packet_tx {
410 None => false,
411 Some(packet_tx) => match packet_tx.try_send(Arc::clone(&packet)) {
412 Ok(()) => true,
413 Err(crossbeam_channel::TrySendError::Disconnected(_)) => false,
414 Err(crossbeam_channel::TrySendError::Full(_)) => {
415 log::info!(
416 "puffin client {} is not accepting data fast enough; dropping a frame",
417 client.client_addr
418 );
419 true
420 }
421 },
422 });
423 self.num_clients.store(self.clients.len(), Ordering::SeqCst);
424
425 Ok(())
426 }
427}
428
429#[expect(clippy::needless_pass_by_value)]
430fn client_loop(
431 packet_rx: crossbeam_channel::Receiver<Packet>,
432 client_addr: SocketAddr,
433 mut tcp_stream: TcpStream,
434) {
435 while let Ok(packet) = packet_rx.recv() {
436 if let Err(err) = tcp_stream.write_all(&packet) {
437 log::info!(
438 "puffin server failed sending to {}: {} (kind: {:?})",
439 client_addr,
440 err,
441 err.kind()
442 );
443 break;
444 }
445 }
446}