Skip to main content

leptos_browser_test/
app.rs

1use crate::{
2    LeptosBrowserTestError, LeptosTestAppConfig, cargo_leptos,
3    error::StartupFailureContext,
4    ports,
5    site::{format_base_url, parse_socket_addr},
6    startup::StartupLogs,
7};
8use rootcause::{IntoReport, Report, bail, prelude::ResultExt};
9use std::path::{Path, PathBuf};
10use std::time::Duration;
11use tokio::io::AsyncWrite;
12use tokio_process_tools::{
13    AutoName, BroadcastOutputStream, Consumable, Consumer, DEFAULT_MAX_BUFFERED_CHUNKS,
14    DEFAULT_READ_CHUNK_SIZE, GracefulShutdown, LineParsingOptions, Next, NumBytesExt, ParseLines,
15    Process, ReliableWithBackpressure, ReplayEnabled, TerminateOnDrop, WaitForLineResult,
16};
17use unwrap_infallible::UnwrapInfallible;
18
19/// A running Leptos test app process.
20///
21/// The app process is terminated automatically when this value is dropped. This relies on
22/// [`TerminateOnDrop`], which requires an active multithreaded Tokio runtime.
23/// Browser tests should use `#[tokio::test(flavor = "multi_thread")]`.
24pub struct LeptosTestApp {
25    _process: TerminateOnDrop<BroadcastOutputStream<ReliableWithBackpressure, ReplayEnabled>>,
26    _stdout_replay: Consumer<()>,
27    _stderr_replay: Consumer<()>,
28    base_url: String,
29    site_addr: String,
30    reload_port: u16,
31    app_dir: PathBuf,
32}
33
34impl LeptosTestApp {
35    /// Start a test app with the default config.
36    ///
37    /// # Errors
38    ///
39    /// Returns an error if startup fails.
40    pub async fn serve(
41        app_dir: impl Into<PathBuf>,
42    ) -> Result<Self, Report<LeptosBrowserTestError>> {
43        LeptosTestAppConfig::new(app_dir).start().await
44    }
45
46    /// The base URL, for example `http://127.0.0.1:3000` or `https://127.0.0.1:3000`.
47    #[must_use]
48    pub fn base_url(&self) -> &str {
49        &self.base_url
50    }
51
52    /// The bound site address, for example `127.0.0.1:3000`.
53    #[must_use]
54    pub fn site_addr(&self) -> &str {
55        &self.site_addr
56    }
57
58    /// The reload port passed through `LEPTOS_RELOAD_PORT`.
59    #[must_use]
60    pub const fn reload_port(&self) -> u16 {
61        self.reload_port
62    }
63
64    /// The canonical app directory.
65    #[must_use]
66    pub fn app_dir(&self) -> &Path {
67        &self.app_dir
68    }
69}
70
71/// Resolved view of a `LeptosTestAppConfig` after path canonicalization, port allocation,
72/// and startup-line/base-URL derivation. Recomputed for each spawn attempt.
73struct RuntimeConfig {
74    app_dir: PathBuf,
75    site_addr: String,
76    reload_port: u16,
77    base_url: String,
78    startup_line: String,
79}
80
81/// Live process plus the log buffers and replay handles tied to it.
82struct SpawnedProcess {
83    process: TerminateOnDrop<BroadcastOutputStream<ReliableWithBackpressure, ReplayEnabled>>,
84    stdout_replay: Consumer<()>,
85    stderr_replay: Consumer<()>,
86    logs: StartupLogs,
87}
88
89/// Maximum number of times we restart cargo-leptos on a port-bind collision before giving up.
90///
91/// Only applies when the caller did not pin both `site_addr` and `reload_port`; if the user
92/// pinned a port and the spawn fails because the port is taken, that's a configuration error,
93/// not a race, and we surface it on the first attempt.
94const MAX_PORT_COLLISION_RETRIES: u32 = 3;
95
96pub(crate) async fn start_configured_app(
97    config: LeptosTestAppConfig,
98) -> Result<LeptosTestApp, Report<LeptosBrowserTestError>> {
99    let auto_allocated = config.site_addr.is_none() || config.reload_port.is_none();
100    let max_attempts = if auto_allocated {
101        MAX_PORT_COLLISION_RETRIES
102    } else {
103        1
104    };
105
106    for attempt in 1..=max_attempts {
107        let runtime = resolve_runtime_config(&config)?;
108        let spawned = spawn_with_log_capture(&runtime, &config)?;
109        match wait_for_ready(&spawned, &runtime, &config).await {
110            Ok(()) => {
111                tracing::info!("{} started at {}", config.app_name, runtime.base_url);
112                return Ok(build_app(spawned, runtime));
113            }
114            Err(err) if attempt < max_attempts && is_port_collision(&err) => {
115                tracing::warn!(
116                    "{} port collision on attempt {attempt}/{max_attempts}; retrying with fresh ports",
117                    config.app_name,
118                );
119                drop(spawned);
120            }
121            Err(err) => return Err(err),
122        }
123    }
124
125    // The retry loop always exits via `return`; this branch is unreachable but keeps the type
126    // checker happy without a panic.
127    unreachable!("start_configured_app retry loop must exit via return")
128}
129
130fn is_port_collision(err: &Report<LeptosBrowserTestError>) -> bool {
131    let (LeptosBrowserTestError::StartupTimedOut { ctx, .. }
132    | LeptosBrowserTestError::StartupStdoutClosed(ctx)
133    | LeptosBrowserTestError::StreamRead(ctx)) = err.current_context()
134    else {
135        return false;
136    };
137    stderr_indicates_port_collision(&ctx.stderr_tail)
138        || stderr_indicates_port_collision(&ctx.stdout_tail)
139}
140
141fn stderr_indicates_port_collision(text: &str) -> bool {
142    let lowered = text.to_ascii_lowercase();
143    // Linux/macOS: "Address already in use" / "address already in use (os error 48)"
144    // Windows:     "Only one usage of each socket address (protocol/network address/port)"
145    lowered.contains("address already in use")
146        || lowered.contains("only one usage of each socket address")
147}
148
149fn resolve_runtime_config(
150    config: &LeptosTestAppConfig,
151) -> Result<RuntimeConfig, Report<LeptosBrowserTestError>> {
152    let app_dir =
153        config
154            .app_dir
155            .canonicalize()
156            .context_with(|| LeptosBrowserTestError::ResolveAppDir {
157                app_name: config.app_name.clone(),
158                app_dir: config.app_dir.clone(),
159            })?;
160
161    let site_addr = if let Some(addr) = config.site_addr.as_deref() {
162        if parse_socket_addr(addr).is_none() {
163            bail!(LeptosBrowserTestError::InvalidSiteAddr {
164                app_name: config.app_name.clone(),
165                site_addr: addr.to_owned(),
166            });
167        }
168        addr.to_owned()
169    } else {
170        let port =
171            ports::find_free_port().context_with(|| LeptosBrowserTestError::FindFreeSitePort {
172                app_name: config.app_name.clone(),
173            })?;
174        format!("127.0.0.1:{port}")
175    };
176    let site_port = parse_socket_addr(&site_addr).map(|sa| sa.port());
177    let reload_port = match config.reload_port {
178        Some(reload_port) => reload_port,
179        None => ports::find_free_port_excluding(site_port).context_with(|| {
180            LeptosBrowserTestError::FindFreeReloadPort {
181                app_name: config.app_name.clone(),
182            }
183        })?,
184    };
185    let base_url = format_base_url(config.site_scheme, &site_addr);
186    let startup_line = config
187        .startup_line
188        .clone()
189        .unwrap_or_else(|| format!("listening on {base_url}"));
190
191    Ok(RuntimeConfig {
192        app_dir,
193        site_addr,
194        reload_port,
195        base_url,
196        startup_line,
197    })
198}
199
200fn spawn_with_log_capture(
201    runtime: &RuntimeConfig,
202    config: &LeptosTestAppConfig,
203) -> Result<SpawnedProcess, Report<LeptosBrowserTestError>> {
204    tracing::info!(
205        graceful_shutdown_timeout = ?config.graceful_shutdown_timeout,
206        graceful_shutdown_unix_signal = ?config.graceful_shutdown_unix_signal,
207        "Starting {} in {:?} on {} (reload port {}).",
208        config.app_name,
209        runtime.app_dir,
210        runtime.site_addr,
211        runtime.reload_port,
212    );
213
214    let cmd = cargo_leptos::command(
215        config.mode,
216        config.cargo_bin.as_deref(),
217        &runtime.app_dir,
218        &runtime.site_addr,
219        runtime.reload_port,
220        config.graceful_shutdown_timeout,
221        config.graceful_shutdown_unix_signal,
222        &config.extra_env,
223    );
224
225    // The graceful shutdown timeout for cargo-leptos itself must be greater (or at least equal to)
226    // the timeout that the user specified for his application (enforced by cargo-leptos).
227    let timeout = config.graceful_shutdown_timeout + Duration::from_secs(10);
228    // We know that cargo-leptos listens for these signals.
229    let graceful_shutdown = GracefulShutdown::builder()
230        .unix_sigint(timeout)
231        .windows_ctrl_break(timeout)
232        .build();
233
234    let process = Process::new(cmd)
235        .name(AutoName::program_only())
236        .stdout_and_stderr(|stream| {
237            stream
238                .broadcast()
239                .reliable_with_backpressure()
240                .replay_last_bytes(1.megabytes())
241                .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
242                .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
243        })
244        .spawn()
245        .context_with(|| LeptosBrowserTestError::SpawnCargoLeptos {
246            app_name: config.app_name.clone(),
247            mode: config.mode,
248        })?
249        .terminate_on_drop(graceful_shutdown);
250
251    let logs = StartupLogs::new(config.startup_log_tail_lines);
252    let forward_logs = config.forward_logs;
253
254    #[allow(clippy::items_after_statements)]
255    async fn write_to<W: AsyncWrite + Unpin>(mut to: W, data: &str) -> tokio::io::Result<()> {
256        use tokio::io::AsyncWriteExt;
257        to.write_all(data.as_bytes()).await?;
258        to.write_all(b"\n").await?;
259        to.flush().await?;
260        Ok(())
261    }
262
263    // Let's forward captured stdout/stderr lines to the output of our process. We do this
264    // asynchronously using the tokio::io::std{out|err}() handles, as writing to
265    // stdout/stderr directly using print!() could result in unhandled "failed printing to
266    // stdout: Resource temporarily unavailable" errors should the cargo-leptos output be
267    // consumed too slowly. This can happen because tokio puts the stdio fds into
268    // non-blocking mode (once touched) and std print! has no support for that, they just
269    // panic when an EAGAIN error is observed. Tokio's stdio handles instead asynchronously
270    // wait internally, handling the slow drainage and preventing a blocked runtime.
271    let stdout_buffer = logs.stdout.clone();
272    let stdout_replay = process
273        .stdout()
274        .consume_async(ParseLines::inspect_async(
275            LineParsingOptions::default(),
276            move |line| {
277                stdout_buffer.push(&line);
278                let line = line.to_string();
279                async move {
280                    if forward_logs && let Err(err) = write_to(tokio::io::stdout(), &line).await {
281                        tracing::error!("Could not forward server process output to stdout: {err}");
282                    }
283                    Next::Continue
284                }
285            },
286        ))
287        .unwrap_infallible();
288
289    let stderr_buffer = logs.stderr.clone();
290    let stderr_replay = process
291        .stderr()
292        .consume_async(ParseLines::inspect_async(
293            LineParsingOptions::default(),
294            move |line| {
295                stderr_buffer.push(&line);
296                let line = line.to_string();
297                async move {
298                    if forward_logs && let Err(err) = write_to(tokio::io::stderr(), &line).await {
299                        tracing::error!("Could not forward server process output to stderr: {err}");
300                    }
301                    Next::Continue
302                }
303            },
304        ))
305        .unwrap_infallible();
306
307    Ok(SpawnedProcess {
308        process,
309        stdout_replay,
310        stderr_replay,
311        logs,
312    })
313}
314
315async fn wait_for_ready(
316    spawned: &SpawnedProcess,
317    runtime: &RuntimeConfig,
318    config: &LeptosTestAppConfig,
319) -> Result<(), Report<LeptosBrowserTestError>> {
320    tracing::info!(
321        "Waiting {:?} ({}) for {} to start...",
322        config.startup_timeout,
323        config.startup_timeout_reason,
324        config.app_name,
325    );
326
327    let expected_line = runtime.startup_line.clone();
328    let startup_waiter = spawned.process.stdout().wait_for_line(
329        config.startup_timeout,
330        move |line| line.contains(&expected_line),
331        LineParsingOptions::default(),
332    );
333    spawned.process.seal_output_replay();
334
335    match startup_waiter.await {
336        Ok(WaitForLineResult::Matched) => Ok(()),
337        Ok(WaitForLineResult::StreamClosed) => {
338            bail!(LeptosBrowserTestError::StartupStdoutClosed(
339                startup_failure_context(&config.app_name, &runtime.startup_line, &spawned.logs),
340            ));
341        }
342        Ok(WaitForLineResult::Timeout) => {
343            bail!(LeptosBrowserTestError::StartupTimedOut {
344                ctx: startup_failure_context(
345                    &config.app_name,
346                    &runtime.startup_line,
347                    &spawned.logs
348                ),
349                timeout: config.startup_timeout,
350                reason: config.startup_timeout_reason.clone(),
351            });
352        }
353        Err(err) => Err(err
354            .into_report()
355            .context(LeptosBrowserTestError::StreamRead(startup_failure_context(
356                &config.app_name,
357                &runtime.startup_line,
358                &spawned.logs,
359            )))),
360    }
361}
362
363fn build_app(spawned: SpawnedProcess, runtime: RuntimeConfig) -> LeptosTestApp {
364    LeptosTestApp {
365        _process: spawned.process,
366        _stdout_replay: spawned.stdout_replay,
367        _stderr_replay: spawned.stderr_replay,
368        base_url: runtime.base_url,
369        site_addr: runtime.site_addr,
370        reload_port: runtime.reload_port,
371        app_dir: runtime.app_dir,
372    }
373}
374
375fn startup_failure_context(
376    app_name: &str,
377    expected_line: &str,
378    logs: &StartupLogs,
379) -> StartupFailureContext {
380    StartupFailureContext {
381        app_name: app_name.to_owned(),
382        expected_line: expected_line.to_owned(),
383        stdout_tail: logs.stdout_tail(),
384        stderr_tail: logs.stderr_tail(),
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use std::time::Duration;
391
392    use assertr::prelude::*;
393    use rootcause::Report;
394
395    use super::{
396        LeptosBrowserTestError, StartupFailureContext, is_port_collision,
397        stderr_indicates_port_collision,
398    };
399
400    fn ctx_with_stderr(stderr: &str) -> StartupFailureContext {
401        StartupFailureContext {
402            app_name: "demo".to_owned(),
403            expected_line: "listening on".to_owned(),
404            stdout_tail: String::new(),
405            stderr_tail: stderr.to_owned(),
406        }
407    }
408
409    #[test]
410    fn detects_unix_address_already_in_use() {
411        assert_that!(stderr_indicates_port_collision(
412            "Error: Address already in use (os error 48)"
413        ))
414        .is_true();
415    }
416
417    #[test]
418    fn detects_lowercase_address_already_in_use() {
419        assert_that!(stderr_indicates_port_collision(
420            "thread 'main' panicked: address already in use"
421        ))
422        .is_true();
423    }
424
425    #[test]
426    fn detects_windows_phrasing() {
427        assert_that!(stderr_indicates_port_collision(
428            "Only one usage of each socket address (protocol/network address/port) is normally permitted"
429        ))
430        .is_true();
431    }
432
433    #[test]
434    fn rejects_unrelated_errors() {
435        assert_that!(stderr_indicates_port_collision(
436            "error: linking with `cc` failed: exit status: 1"
437        ))
438        .is_false();
439        assert_that!(stderr_indicates_port_collision("")).is_false();
440    }
441
442    #[test]
443    fn is_port_collision_recognizes_startup_timed_out() {
444        let report = Report::new(LeptosBrowserTestError::StartupTimedOut {
445            ctx: ctx_with_stderr("Address already in use"),
446            timeout: Duration::from_secs(5),
447            reason: "test".to_owned(),
448        });
449        assert_that!(is_port_collision(&report)).is_true();
450    }
451
452    #[test]
453    fn is_port_collision_recognizes_stdout_closed() {
454        let report = Report::new(LeptosBrowserTestError::StartupStdoutClosed(
455            ctx_with_stderr("address already in use"),
456        ));
457        assert_that!(is_port_collision(&report)).is_true();
458    }
459
460    #[test]
461    fn is_port_collision_ignores_unrelated_variants() {
462        let report = Report::new(LeptosBrowserTestError::FindFreeSitePort {
463            app_name: "demo".to_owned(),
464        });
465        assert_that!(is_port_collision(&report)).is_false();
466    }
467
468    #[test]
469    fn is_port_collision_ignores_startup_with_clean_stderr() {
470        let report = Report::new(LeptosBrowserTestError::StartupTimedOut {
471            ctx: ctx_with_stderr("compilation error: ..."),
472            timeout: Duration::from_secs(5),
473            reason: "test".to_owned(),
474        });
475        assert_that!(is_port_collision(&report)).is_false();
476    }
477}