Skip to main content

puffin_http/
server.rs

1use anyhow::Context as _;
2use puffin::{FrameSinkId, GlobalProfiler, ScopeCollection};
3use std::{
4    io::Write as _,
5    net::{SocketAddr, TcpListener, TcpStream},
6    sync::{
7        Arc,
8        atomic::{AtomicUsize, Ordering},
9    },
10};
11
12/// Maximum size of the backlog of packets to send to a client if they aren't reading fast enough.
13const MAX_FRAMES_IN_QUEUE: usize = 30;
14
15/// Listens for incoming connections
16/// and streams them puffin profiler data.
17///
18/// Drop to stop transmitting and listening for new connections.
19#[must_use = "When Server is dropped, the server is closed, so keep it around!"]
20pub struct Server {
21    sink_id: FrameSinkId,
22    join_handle: Option<std::thread::JoinHandle<()>>,
23    num_clients: Arc<AtomicUsize>,
24    sink_remove: fn(FrameSinkId) -> (),
25}
26
27impl Server {
28    /// Start listening for connections on this addr (e.g. "0.0.0.0:8585")
29    ///
30    /// Connects to the [`GlobalProfiler`]
31    ///
32    /// # Errors
33    ///
34    /// forward error from [`Self::new_custom`] call.
35    pub fn new(bind_addr: &str) -> anyhow::Result<Self> {
36        fn global_add(sink: puffin::FrameSink) -> FrameSinkId {
37            GlobalProfiler::lock().add_sink(sink)
38        }
39        fn global_remove(id: FrameSinkId) {
40            GlobalProfiler::lock().remove_sink(id);
41        }
42
43        Self::new_custom(bind_addr, global_add, global_remove)
44    }
45
46    /// Starts a new puffin server, with a custom function for installing the server's sink
47    ///
48    /// # Arguments
49    /// * `bind_addr` - The address to bind to, when listening for connections
50    ///   (e.g. "localhost:8585" or "127.0.0.1:8585")
51    /// * `sink_install` - A function that installs the [Server]'s sink into
52    ///   a [`GlobalProfiler`], and then returns the [`FrameSinkId`] so that the sink can be removed later
53    /// * `sink_remove` - A function that reverts `sink_install`.
54    ///   This should be a call to remove the sink from the profiler ([`GlobalProfiler::remove_sink`])
55    ///
56    /// # Example
57    ///
58    /// Using this is slightly complicated, but it is possible to use this to set a custom profiler per-thread,
59    /// such that threads can be grouped together and profiled separately. E.g. you could have one profiling server
60    /// instance for the main UI loop, and another for the background worker loop, and events/frames from those thread(s)
61    /// would be completely separated. You can then hook up two separate instances of `puffin_viewer` and profile them separately.
62    ///
63    /// # Errors
64    ///
65    /// Will return an `io::Error` if the [`TcpListener::bind`] fail.
66    /// Will return an `io::Error` if the spawn of the thread ,for connection and data send management, fail.
67    ///
68    /// ## Per-Thread Profiling
69    /// ```
70    /// # use puffin::GlobalProfiler;
71    /// # use puffin::{StreamInfoRef, ThreadInfo, ScopeDetails};
72    /// # use puffin_http::Server;
73    /// # use puffin::ThreadProfiler;
74    /// #
75    /// # pub fn main() {
76    /// #
77    /// #
78    /// // Initialise the profiling server for the main app
79    /// let default_server = Server::new("localhost:8585").expect("failed to create default profiling server");
80    /// puffin::profile_scope!("main_scope");
81    ///
82    /// // Create a new [GlobalProfiler] instance. This is where we will be sending the events to for our threads.
83    /// // [OnceLock] and [Mutex] are there so that we can safely get exclusive mutable access.
84    /// static CUSTOM_PROFILER: std::sync::OnceLock<std::sync::Mutex<GlobalProfiler>> = std::sync::OnceLock::new();
85    /// // Helper function to access the profiler
86    /// fn get_custom_profiler() -> std::sync::MutexGuard<'static, GlobalProfiler> {
87    ///    CUSTOM_PROFILER.get_or_init(|| std::sync::Mutex::new(GlobalProfiler::default()))
88    ///         .lock().expect("failed to lock custom profiler")
89    /// }
90    /// // Create the custom profiling server that uses our custom profiler instead of the global/default one
91    /// let thread_server = Server::new_custom(
92    ///     "localhost:6969",
93    ///     // Adds the [Server]'s sink to our custom profiler
94    ///     |sink| get_custom_profiler().add_sink(sink),
95    ///     // Remove
96    ///     |id| _ = get_custom_profiler().remove_sink(id)
97    /// );
98    ///
99    /// // Create some custom threads where we use the custom profiler and server
100    /// std::thread::scope(|scope| {
101    ///     scope.spawn(move ||{
102    ///         // Tell this thread to use the custom profiler
103    ///         let _ = ThreadProfiler::initialize(
104    ///             // Use the same time source as default puffin
105    ///             puffin::now_ns,
106    ///             // However redirect the events to our `custom_profiler`, instead of the default
107    ///             // which would be the one returned by [GlobalProfiler::lock()]
108    ///             |info: ThreadInfo, details: &[ScopeDetails], stream: &StreamInfoRef<'_>|
109    ///                 get_custom_profiler().report(info, details, stream)
110    ///         );
111    ///
112    ///         // Do work
113    ///         {
114    ///             puffin::profile_scope!("inside_thread");
115    ///             println!("hello from the thread");
116    ///             std::thread::sleep(std::time::Duration::from_secs(1));
117    ///         }
118    ///
119    ///         // Tell our profiler that we are done with this frame
120    ///         // This will be sent to the server on port 6969
121    ///         get_custom_profiler().new_frame();
122    ///     });
123    /// });
124    ///
125    /// // New frame for the global profiler. This is completely separate from the scopes with the custom profiler
126    /// GlobalProfiler::lock().new_frame();
127    /// #
128    /// #
129    /// # }
130    /// ```
131    ///
132    /// ## Helpful Macro
133    /// ```rust
134    /// # use std::thread::sleep;
135    /// # use std::time::Duration;
136    ///
137    /// /// This macro makes it much easier to define profilers
138    /// ///
139    /// /// This macro makes use of the `pastey` crate to generate unique identifiers, and `tracing` to log events
140    /// macro_rules! profiler {
141    ///     ($(
142    ///          {name: $name:ident, port: $port:expr $(,install: |$install_var:ident| $install:block, drop: |$drop_var:ident| $drop:block)? $(,)?}
143    ///      ),* $(,)?)
144    ///     => {
145    ///         $(
146    ///             profiler!(@inner { name: $name, port: $port $(,install: |$install_var| $install, drop: |$drop_var| $drop)? });
147    ///         )*
148    ///     };
149    ///
150    ///     (@inner { name: $name:ident, port: $port:expr }) => {
151    ///         pastey::paste!{
152    ///             #[doc = concat!("The address to bind the ", std::stringify!([< $name:lower >]), " thread profilers' server to")]
153    ///                 pub const [< $name:upper _PROFILER_ADDR >] : &'static str
154    ///                     = concat!("127.0.0.1:", $port);
155    ///
156    ///                 /// Installs the server's sink into the custom profiler
157    ///                 #[doc(hidden)]
158    ///                 fn [< $name:lower _profiler_server_install >](sink: puffin::FrameSink) -> puffin::FrameSinkId {
159    ///                     [< $name:lower _profiler_lock >]().add_sink(sink)
160    ///                 }
161    ///
162    ///                 /// Drops the server's sink and removes from profiler
163    ///                 #[doc(hidden)]
164    ///                 fn [< $name:lower _profiler_server_drop >](id: puffin::FrameSinkId){
165    ///                     [< $name:lower _profiler_lock >]().remove_sink(id);
166    ///                 }
167    ///
168    ///                 #[doc = concat!("The instance of the ", std::stringify!([< $name:lower >]), " thread profiler's server")]
169    ///                 pub static [< $name:upper _PROFILER_SERVER >] : std::sync::LazyLock<std::sync::Mutex<puffin_http::Server>>
170    ///                     = std::sync::LazyLock::new(|| {
171    ///                         eprintln!(
172    ///                             "starting puffin_http server for {} profiler at {}",
173    ///                             std::stringify!([<$name:lower>]),
174    ///                             [< $name:upper _PROFILER_ADDR >]
175    ///                         );
176    ///                         std::sync::Mutex::new(
177    ///                             puffin_http::Server::new_custom(
178    ///                                 [< $name:upper _PROFILER_ADDR >],
179    ///                                 // Can't use closures in a const context, use fn-pointers instead
180    ///                                 [< $name:lower _profiler_server_install >],
181    ///                                 [< $name:lower _profiler_server_drop >],
182    ///                             )
183    ///                             .expect(&format!("{} puffin_http server failed to start", std::stringify!([<$name:lower>])))
184    ///                         )
185    ///                     });
186    ///
187    ///                 #[doc = concat!("A custom reporter for the ", std::stringify!([< $name:lower >]), " thread reporter")]
188    ///                 pub fn [< $name:lower _profiler_reporter >] (info: puffin::ThreadInfo, details: &[puffin::ScopeDetails],  stream: &puffin::StreamInfoRef<'_>) {
189    ///                     [< $name:lower _profiler_lock >]().report(info, details, stream)
190    ///                 }
191    ///
192    ///                 #[doc = concat!("Accessor for the ", std::stringify!([< $name:lower >]), " thread reporter")]
193    ///                 pub fn [< $name:lower _profiler_lock >]() -> std::sync::MutexGuard<'static, puffin::GlobalProfiler> {
194    ///                     static [< $name _PROFILER >] : std::sync::LazyLock<std::sync::Mutex<puffin::GlobalProfiler>> = std::sync::LazyLock::new(Default::default);
195    ///                     [< $name _PROFILER >].lock().expect("poisoned std::sync::mutex")
196    ///                 }
197    ///
198    ///                 #[doc = concat!("Initialises the ", std::stringify!([< $name:lower >]), " thread reporter and server.\
199    ///                 Call this on each different thread you want to register with this profiler")]
200    ///                 pub fn [< $name:lower _profiler_init >]() {
201    ///                     eprintln!("init thread profiler \"{}\"", std::stringify!([<$name:lower>]));
202    ///                     std::mem::drop([< $name:upper _PROFILER_SERVER >].lock());
203    ///                     eprintln!("set thread custom profiler \"{}\"", std::stringify!([<$name:lower>]));
204    ///                     puffin::ThreadProfiler::initialize(::puffin::now_ns, [< $name:lower _profiler_reporter >]);
205    ///                 }
206    ///         }
207    ///     };
208    /// }
209    ///
210    /// profiler! {
211    ///     { name: UI,          port: "2a" },
212    ///     { name: RENDERER,    port: 8586 },
213    ///     { name: BACKGROUND,  port: 8587 },
214    /// }
215    ///
216    /// pub fn demo() {
217    ///     std::thread::spawn(|| {
218    ///         // Initialise the custom profiler for this thread
219    ///         // Now all puffin events are sent to the custom profiling server instead
220    ///         //
221    ///         background_profiler_init();
222    ///
223    ///         for i in 0..100{
224    ///             puffin::profile_scope!("test");
225    ///             sleep(Duration::from_millis(i));
226    ///         }
227    ///
228    ///         // Mark a new frame so the data is flushed to the server
229    ///         background_profiler_lock().new_frame();
230    ///     });
231    /// }
232    /// ```
233    pub fn new_custom(
234        bind_addr: &str,
235        sink_install: fn(puffin::FrameSink) -> FrameSinkId,
236        sink_remove: fn(FrameSinkId) -> (),
237    ) -> anyhow::Result<Self> {
238        let tcp_listener = TcpListener::bind(bind_addr).context("binding server TCP socket")?;
239        tcp_listener
240            .set_nonblocking(true)
241            .context("TCP set_nonblocking")?;
242
243        // We use crossbeam_channel instead of `mpsc`,
244        // because on shutdown we want all frames to be sent.
245        // `mpsc::Receiver` stops receiving as soon as the `Sender` is dropped,
246        // but `crossbeam_channel` will continue until the channel is empty.
247        let (tx, rx): (crossbeam_channel::Sender<Arc<puffin::FrameData>>, _) =
248            crossbeam_channel::unbounded();
249
250        let num_clients = Arc::new(AtomicUsize::default());
251        let num_clients_cloned = Arc::clone(&num_clients);
252
253        let join_handle = std::thread::Builder::new()
254            .name("puffin-server".to_owned())
255            .spawn(move || {
256                let mut server_impl = PuffinServerImpl {
257                    tcp_listener,
258                    clients: Default::default(),
259                    num_clients: num_clients_cloned,
260                    send_all_scopes: false,
261                    scope_collection: Default::default(),
262                };
263
264                while let Ok(frame) = rx.recv() {
265                    if let Err(err) = server_impl.accept_new_clients() {
266                        log::warn!("puffin server failure: {err}");
267                    }
268
269                    if let Err(err) = server_impl.send(&frame) {
270                        log::warn!("puffin server failure: {err}");
271                    }
272                }
273            })
274            .context("Couldn't spawn thread")?;
275
276        // Call the `install` function to add ourselves as a sink
277        let sink_id = sink_install(Box::new(move |frame| {
278            tx.send(frame).ok();
279        }));
280
281        Ok(Self {
282            sink_id,
283            join_handle: Some(join_handle),
284            num_clients,
285            sink_remove,
286        })
287    }
288
289    /// Number of clients currently connected.
290    pub fn num_clients(&self) -> usize {
291        self.num_clients.load(Ordering::SeqCst)
292    }
293}
294
295impl Drop for Server {
296    fn drop(&mut self) {
297        // Remove ourselves from the profiler
298        (self.sink_remove)(self.sink_id);
299
300        // Take care to send everything before we shut down:
301        if let Some(join_handle) = self.join_handle.take() {
302            join_handle.join().ok();
303        }
304    }
305}
306
307type Packet = Arc<[u8]>;
308
309struct Client {
310    client_addr: SocketAddr,
311    packet_tx: Option<crossbeam_channel::Sender<Packet>>,
312    join_handle: Option<std::thread::JoinHandle<()>>,
313}
314
315impl Drop for Client {
316    fn drop(&mut self) {
317        // Take care to send everything before we shut down!
318
319        // Drop the sender to signal to shut down:
320        self.packet_tx = None;
321
322        // Wait for the shutdown:
323        if let Some(join_handle) = self.join_handle.take() {
324            join_handle.join().ok();
325        }
326    }
327}
328
329/// Listens for incoming connections
330/// and streams them puffin profiler data.
331struct PuffinServerImpl {
332    tcp_listener: TcpListener,
333    clients: Vec<Client>,
334    num_clients: Arc<AtomicUsize>,
335    send_all_scopes: bool,
336    scope_collection: ScopeCollection,
337}
338
339impl PuffinServerImpl {
340    fn accept_new_clients(&mut self) -> anyhow::Result<()> {
341        loop {
342            match self.tcp_listener.accept() {
343                Ok((tcp_stream, client_addr)) => {
344                    tcp_stream
345                        .set_nonblocking(false)
346                        .context("stream.set_nonblocking")?;
347
348                    log::info!("{client_addr} connected");
349
350                    let (packet_tx, packet_rx) = crossbeam_channel::bounded(MAX_FRAMES_IN_QUEUE);
351
352                    let join_handle = std::thread::Builder::new()
353                        .name("puffin-server-client".to_owned())
354                        .spawn(move || client_loop(packet_rx, client_addr, tcp_stream))
355                        .context("Couldn't spawn thread")?;
356
357                    // Send all scopes when new client connects.
358                    self.send_all_scopes = true;
359                    self.clients.push(Client {
360                        client_addr,
361                        packet_tx: Some(packet_tx),
362                        join_handle: Some(join_handle),
363                    });
364                    self.num_clients.store(self.clients.len(), Ordering::SeqCst);
365                }
366                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
367                    break; // Nothing to do for now.
368                }
369                Err(err) => {
370                    anyhow::bail!("puffin server TCP error: {err}");
371                }
372            }
373        }
374        Ok(())
375    }
376
377    pub fn send(&mut self, frame: &puffin::FrameData) -> anyhow::Result<()> {
378        puffin::profile_function!();
379
380        // Keep scope_collection up-to-date
381        for new_scope in &frame.scope_delta {
382            self.scope_collection.insert(Arc::clone(new_scope));
383        }
384
385        // Nothing to send if no clients => Early return.
386        if self.clients.is_empty() {
387            return Ok(());
388        }
389
390        let mut packet = vec![];
391
392        packet
393            .write_all(&crate::PROTOCOL_VERSION.to_le_bytes())
394            .context("Encode puffin `PROTOCOL_VERSION` in packet to be send to client.")?;
395
396        let scope_collection = if self.send_all_scopes {
397            Some(&self.scope_collection)
398        } else {
399            None
400        };
401
402        frame
403            .write_into(scope_collection, &mut packet)
404            .context("Encode puffin frame")?;
405        self.send_all_scopes = false;
406
407        let packet: Packet = packet.into();
408
409        self.clients.retain(|client| match &client.packet_tx {
410            None => false,
411            Some(packet_tx) => match packet_tx.try_send(Arc::clone(&packet)) {
412                Ok(()) => true,
413                Err(crossbeam_channel::TrySendError::Disconnected(_)) => false,
414                Err(crossbeam_channel::TrySendError::Full(_)) => {
415                    log::info!(
416                        "puffin client {} is not accepting data fast enough; dropping a frame",
417                        client.client_addr
418                    );
419                    true
420                }
421            },
422        });
423        self.num_clients.store(self.clients.len(), Ordering::SeqCst);
424
425        Ok(())
426    }
427}
428
429#[expect(clippy::needless_pass_by_value)]
430fn client_loop(
431    packet_rx: crossbeam_channel::Receiver<Packet>,
432    client_addr: SocketAddr,
433    mut tcp_stream: TcpStream,
434) {
435    while let Ok(packet) = packet_rx.recv() {
436        if let Err(err) = tcp_stream.write_all(&packet) {
437            log::info!(
438                "puffin server failed sending to {}: {} (kind: {:?})",
439                client_addr,
440                err,
441                err.kind()
442            );
443            break;
444        }
445    }
446}