use std::future::Future;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use tokio::net::TcpStream;
use crate::error::{Error, Result};
use super::RunningProcess;
const READINESS_POLL: Duration = Duration::from_millis(50);
const CONNECT_ATTEMPT_CAP: Duration = Duration::from_secs(1);
impl RunningProcess {
pub async fn wait_for_line(
&mut self,
predicate: impl Fn(&str) -> bool,
within: Duration,
) -> Result<String> {
use tokio_stream::StreamExt;
let mut lines = self.stdout_lines();
let search = async {
while let Some(line) = lines.next().await {
if predicate(&line) {
return Some(line);
}
}
None };
match tokio::time::timeout(within, search).await {
Ok(Some(line)) => Ok(line),
Ok(None) | Err(_) => Err(self.not_ready(within)),
}
}
pub async fn wait_for<F, Fut>(&mut self, check: F, within: Duration) -> Result<()>
where
F: FnMut() -> Fut,
Fut: Future<Output = bool>,
{
self.poll_until(check, within).await
}
pub async fn wait_for_port(&mut self, addr: SocketAddr, within: Duration) -> Result<()> {
let deadline = Instant::now() + within;
self.poll_until(
move || {
let remaining = deadline.saturating_duration_since(Instant::now());
async move {
let cap = CONNECT_ATTEMPT_CAP
.min(remaining)
.max(Duration::from_millis(1));
matches!(
tokio::time::timeout(cap, TcpStream::connect(addr)).await,
Ok(Ok(_))
)
}
},
within,
)
.await
}
async fn poll_until<F, Fut>(&mut self, mut check: F, within: Duration) -> Result<()>
where
F: FnMut() -> Fut,
Fut: Future<Output = bool>,
{
let deadline = Instant::now() + within;
loop {
if check().await {
return Ok(());
}
if self.has_exited_now() {
return Err(self.not_ready(within));
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return Err(self.not_ready(within));
}
tokio::time::sleep(READINESS_POLL.min(remaining)).await;
}
}
fn not_ready(&self, within: Duration) -> Error {
Error::NotReady {
program: self.program.clone(),
timeout: within,
}
}
}