1use crate::{
2 LeptosBrowserTestError, LeptosTestAppConfig, cargo_leptos,
3 error::StartupFailureContext,
4 ports,
5 site::{format_base_url, parse_socket_addr},
6 startup::StartupLogs,
7};
8use rootcause::{IntoReport, Report, bail, prelude::ResultExt};
9use std::path::{Path, PathBuf};
10use std::time::Duration;
11use tokio::io::AsyncWrite;
12use tokio_process_tools::{
13 AutoName, BroadcastOutputStream, Consumable, Consumer, DEFAULT_MAX_BUFFERED_CHUNKS,
14 DEFAULT_READ_CHUNK_SIZE, GracefulShutdown, LineParsingOptions, Next, NumBytesExt, ParseLines,
15 Process, ReliableWithBackpressure, ReplayEnabled, TerminateOnDrop, WaitForLineResult,
16};
17use unwrap_infallible::UnwrapInfallible;
18
19pub struct LeptosTestApp {
25 _process: TerminateOnDrop<BroadcastOutputStream<ReliableWithBackpressure, ReplayEnabled>>,
26 _stdout_replay: Consumer<()>,
27 _stderr_replay: Consumer<()>,
28 base_url: String,
29 site_addr: String,
30 reload_port: u16,
31 app_dir: PathBuf,
32}
33
34impl LeptosTestApp {
35 pub async fn serve(
41 app_dir: impl Into<PathBuf>,
42 ) -> Result<Self, Report<LeptosBrowserTestError>> {
43 LeptosTestAppConfig::new(app_dir).start().await
44 }
45
46 #[must_use]
48 pub fn base_url(&self) -> &str {
49 &self.base_url
50 }
51
52 #[must_use]
54 pub fn site_addr(&self) -> &str {
55 &self.site_addr
56 }
57
58 #[must_use]
60 pub const fn reload_port(&self) -> u16 {
61 self.reload_port
62 }
63
64 #[must_use]
66 pub fn app_dir(&self) -> &Path {
67 &self.app_dir
68 }
69}
70
71struct RuntimeConfig {
74 app_dir: PathBuf,
75 site_addr: String,
76 reload_port: u16,
77 base_url: String,
78 startup_line: String,
79}
80
81struct SpawnedProcess {
83 process: TerminateOnDrop<BroadcastOutputStream<ReliableWithBackpressure, ReplayEnabled>>,
84 stdout_replay: Consumer<()>,
85 stderr_replay: Consumer<()>,
86 logs: StartupLogs,
87}
88
89const MAX_PORT_COLLISION_RETRIES: u32 = 3;
95
96pub(crate) async fn start_configured_app(
97 config: LeptosTestAppConfig,
98) -> Result<LeptosTestApp, Report<LeptosBrowserTestError>> {
99 let auto_allocated = config.site_addr.is_none() || config.reload_port.is_none();
100 let max_attempts = if auto_allocated {
101 MAX_PORT_COLLISION_RETRIES
102 } else {
103 1
104 };
105
106 for attempt in 1..=max_attempts {
107 let runtime = resolve_runtime_config(&config)?;
108 let spawned = spawn_with_log_capture(&runtime, &config)?;
109 match wait_for_ready(&spawned, &runtime, &config).await {
110 Ok(()) => {
111 tracing::info!("{} started at {}", config.app_name, runtime.base_url);
112 return Ok(build_app(spawned, runtime));
113 }
114 Err(err) if attempt < max_attempts && is_port_collision(&err) => {
115 tracing::warn!(
116 "{} port collision on attempt {attempt}/{max_attempts}; retrying with fresh ports",
117 config.app_name,
118 );
119 drop(spawned);
120 }
121 Err(err) => return Err(err),
122 }
123 }
124
125 unreachable!("start_configured_app retry loop must exit via return")
128}
129
130fn is_port_collision(err: &Report<LeptosBrowserTestError>) -> bool {
131 let (LeptosBrowserTestError::StartupTimedOut { ctx, .. }
132 | LeptosBrowserTestError::StartupStdoutClosed(ctx)
133 | LeptosBrowserTestError::StreamRead(ctx)) = err.current_context()
134 else {
135 return false;
136 };
137 stderr_indicates_port_collision(&ctx.stderr_tail)
138 || stderr_indicates_port_collision(&ctx.stdout_tail)
139}
140
141fn stderr_indicates_port_collision(text: &str) -> bool {
142 let lowered = text.to_ascii_lowercase();
143 lowered.contains("address already in use")
146 || lowered.contains("only one usage of each socket address")
147}
148
149fn resolve_runtime_config(
150 config: &LeptosTestAppConfig,
151) -> Result<RuntimeConfig, Report<LeptosBrowserTestError>> {
152 let app_dir =
153 config
154 .app_dir
155 .canonicalize()
156 .context_with(|| LeptosBrowserTestError::ResolveAppDir {
157 app_name: config.app_name.clone(),
158 app_dir: config.app_dir.clone(),
159 })?;
160
161 let site_addr = if let Some(addr) = config.site_addr.as_deref() {
162 if parse_socket_addr(addr).is_none() {
163 bail!(LeptosBrowserTestError::InvalidSiteAddr {
164 app_name: config.app_name.clone(),
165 site_addr: addr.to_owned(),
166 });
167 }
168 addr.to_owned()
169 } else {
170 let port =
171 ports::find_free_port().context_with(|| LeptosBrowserTestError::FindFreeSitePort {
172 app_name: config.app_name.clone(),
173 })?;
174 format!("127.0.0.1:{port}")
175 };
176 let site_port = parse_socket_addr(&site_addr).map(|sa| sa.port());
177 let reload_port = match config.reload_port {
178 Some(reload_port) => reload_port,
179 None => ports::find_free_port_excluding(site_port).context_with(|| {
180 LeptosBrowserTestError::FindFreeReloadPort {
181 app_name: config.app_name.clone(),
182 }
183 })?,
184 };
185 let base_url = format_base_url(config.site_scheme, &site_addr);
186 let startup_line = config
187 .startup_line
188 .clone()
189 .unwrap_or_else(|| format!("listening on {base_url}"));
190
191 Ok(RuntimeConfig {
192 app_dir,
193 site_addr,
194 reload_port,
195 base_url,
196 startup_line,
197 })
198}
199
200fn spawn_with_log_capture(
201 runtime: &RuntimeConfig,
202 config: &LeptosTestAppConfig,
203) -> Result<SpawnedProcess, Report<LeptosBrowserTestError>> {
204 tracing::info!(
205 graceful_shutdown_timeout = ?config.graceful_shutdown_timeout,
206 graceful_shutdown_unix_signal = ?config.graceful_shutdown_unix_signal,
207 "Starting {} in {:?} on {} (reload port {}).",
208 config.app_name,
209 runtime.app_dir,
210 runtime.site_addr,
211 runtime.reload_port,
212 );
213
214 let cmd = cargo_leptos::command(
215 config.mode,
216 config.cargo_bin.as_deref(),
217 &runtime.app_dir,
218 &runtime.site_addr,
219 runtime.reload_port,
220 config.graceful_shutdown_timeout,
221 config.graceful_shutdown_unix_signal,
222 &config.extra_env,
223 );
224
225 let timeout = config.graceful_shutdown_timeout + Duration::from_secs(10);
228 let graceful_shutdown = GracefulShutdown::builder()
230 .unix_sigint(timeout)
231 .windows_ctrl_break(timeout)
232 .build();
233
234 let process = Process::new(cmd)
235 .name(AutoName::program_only())
236 .stdout_and_stderr(|stream| {
237 stream
238 .broadcast()
239 .reliable_with_backpressure()
240 .replay_last_bytes(1.megabytes())
241 .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
242 .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
243 })
244 .spawn()
245 .context_with(|| LeptosBrowserTestError::SpawnCargoLeptos {
246 app_name: config.app_name.clone(),
247 mode: config.mode,
248 })?
249 .terminate_on_drop(graceful_shutdown);
250
251 let logs = StartupLogs::new(config.startup_log_tail_lines);
252 let forward_logs = config.forward_logs;
253
254 #[allow(clippy::items_after_statements)]
255 async fn write_to<W: AsyncWrite + Unpin>(mut to: W, data: &str) -> tokio::io::Result<()> {
256 use tokio::io::AsyncWriteExt;
257 to.write_all(data.as_bytes()).await?;
258 to.write_all(b"\n").await?;
259 to.flush().await?;
260 Ok(())
261 }
262
263 let stdout_buffer = logs.stdout.clone();
272 let stdout_replay = process
273 .stdout()
274 .consume_async(ParseLines::inspect_async(
275 LineParsingOptions::default(),
276 move |line| {
277 stdout_buffer.push(&line);
278 let line = line.to_string();
279 async move {
280 if forward_logs && let Err(err) = write_to(tokio::io::stdout(), &line).await {
281 tracing::error!("Could not forward server process output to stdout: {err}");
282 }
283 Next::Continue
284 }
285 },
286 ))
287 .unwrap_infallible();
288
289 let stderr_buffer = logs.stderr.clone();
290 let stderr_replay = process
291 .stderr()
292 .consume_async(ParseLines::inspect_async(
293 LineParsingOptions::default(),
294 move |line| {
295 stderr_buffer.push(&line);
296 let line = line.to_string();
297 async move {
298 if forward_logs && let Err(err) = write_to(tokio::io::stderr(), &line).await {
299 tracing::error!("Could not forward server process output to stderr: {err}");
300 }
301 Next::Continue
302 }
303 },
304 ))
305 .unwrap_infallible();
306
307 Ok(SpawnedProcess {
308 process,
309 stdout_replay,
310 stderr_replay,
311 logs,
312 })
313}
314
315async fn wait_for_ready(
316 spawned: &SpawnedProcess,
317 runtime: &RuntimeConfig,
318 config: &LeptosTestAppConfig,
319) -> Result<(), Report<LeptosBrowserTestError>> {
320 tracing::info!(
321 "Waiting {:?} ({}) for {} to start...",
322 config.startup_timeout,
323 config.startup_timeout_reason,
324 config.app_name,
325 );
326
327 let expected_line = runtime.startup_line.clone();
328 let startup_waiter = spawned.process.stdout().wait_for_line(
329 config.startup_timeout,
330 move |line| line.contains(&expected_line),
331 LineParsingOptions::default(),
332 );
333 spawned.process.seal_output_replay();
334
335 match startup_waiter.await {
336 Ok(WaitForLineResult::Matched) => Ok(()),
337 Ok(WaitForLineResult::StreamClosed) => {
338 bail!(LeptosBrowserTestError::StartupStdoutClosed(
339 startup_failure_context(&config.app_name, &runtime.startup_line, &spawned.logs),
340 ));
341 }
342 Ok(WaitForLineResult::Timeout) => {
343 bail!(LeptosBrowserTestError::StartupTimedOut {
344 ctx: startup_failure_context(
345 &config.app_name,
346 &runtime.startup_line,
347 &spawned.logs
348 ),
349 timeout: config.startup_timeout,
350 reason: config.startup_timeout_reason.clone(),
351 });
352 }
353 Err(err) => Err(err
354 .into_report()
355 .context(LeptosBrowserTestError::StreamRead(startup_failure_context(
356 &config.app_name,
357 &runtime.startup_line,
358 &spawned.logs,
359 )))),
360 }
361}
362
363fn build_app(spawned: SpawnedProcess, runtime: RuntimeConfig) -> LeptosTestApp {
364 LeptosTestApp {
365 _process: spawned.process,
366 _stdout_replay: spawned.stdout_replay,
367 _stderr_replay: spawned.stderr_replay,
368 base_url: runtime.base_url,
369 site_addr: runtime.site_addr,
370 reload_port: runtime.reload_port,
371 app_dir: runtime.app_dir,
372 }
373}
374
375fn startup_failure_context(
376 app_name: &str,
377 expected_line: &str,
378 logs: &StartupLogs,
379) -> StartupFailureContext {
380 StartupFailureContext {
381 app_name: app_name.to_owned(),
382 expected_line: expected_line.to_owned(),
383 stdout_tail: logs.stdout_tail(),
384 stderr_tail: logs.stderr_tail(),
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use std::time::Duration;
391
392 use assertr::prelude::*;
393 use rootcause::Report;
394
395 use super::{
396 LeptosBrowserTestError, StartupFailureContext, is_port_collision,
397 stderr_indicates_port_collision,
398 };
399
400 fn ctx_with_stderr(stderr: &str) -> StartupFailureContext {
401 StartupFailureContext {
402 app_name: "demo".to_owned(),
403 expected_line: "listening on".to_owned(),
404 stdout_tail: String::new(),
405 stderr_tail: stderr.to_owned(),
406 }
407 }
408
409 #[test]
410 fn detects_unix_address_already_in_use() {
411 assert_that!(stderr_indicates_port_collision(
412 "Error: Address already in use (os error 48)"
413 ))
414 .is_true();
415 }
416
417 #[test]
418 fn detects_lowercase_address_already_in_use() {
419 assert_that!(stderr_indicates_port_collision(
420 "thread 'main' panicked: address already in use"
421 ))
422 .is_true();
423 }
424
425 #[test]
426 fn detects_windows_phrasing() {
427 assert_that!(stderr_indicates_port_collision(
428 "Only one usage of each socket address (protocol/network address/port) is normally permitted"
429 ))
430 .is_true();
431 }
432
433 #[test]
434 fn rejects_unrelated_errors() {
435 assert_that!(stderr_indicates_port_collision(
436 "error: linking with `cc` failed: exit status: 1"
437 ))
438 .is_false();
439 assert_that!(stderr_indicates_port_collision("")).is_false();
440 }
441
442 #[test]
443 fn is_port_collision_recognizes_startup_timed_out() {
444 let report = Report::new(LeptosBrowserTestError::StartupTimedOut {
445 ctx: ctx_with_stderr("Address already in use"),
446 timeout: Duration::from_secs(5),
447 reason: "test".to_owned(),
448 });
449 assert_that!(is_port_collision(&report)).is_true();
450 }
451
452 #[test]
453 fn is_port_collision_recognizes_stdout_closed() {
454 let report = Report::new(LeptosBrowserTestError::StartupStdoutClosed(
455 ctx_with_stderr("address already in use"),
456 ));
457 assert_that!(is_port_collision(&report)).is_true();
458 }
459
460 #[test]
461 fn is_port_collision_ignores_unrelated_variants() {
462 let report = Report::new(LeptosBrowserTestError::FindFreeSitePort {
463 app_name: "demo".to_owned(),
464 });
465 assert_that!(is_port_collision(&report)).is_false();
466 }
467
468 #[test]
469 fn is_port_collision_ignores_startup_with_clean_stderr() {
470 let report = Report::new(LeptosBrowserTestError::StartupTimedOut {
471 ctx: ctx_with_stderr("compilation error: ..."),
472 timeout: Duration::from_secs(5),
473 reason: "test".to_owned(),
474 });
475 assert_that!(is_port_collision(&report)).is_false();
476 }
477}