use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use bairelay_neolink_core::bc_protocol::BcCamera;
use futures::future::BoxFuture;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use crate::bc_opts::{build_bc_opts, max_encryption};
use crate::config::CameraConfig;
use crate::oneshot::errors::InterruptedError;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
const LOGIN_TIMEOUT: Duration = Duration::from_secs(30);
const OP_TIMEOUT: Duration = Duration::from_secs(30);
const LOGOUT_TIMEOUT: Duration = Duration::from_secs(5);
pub async fn run<F, T>(cfg: &CameraConfig, cancel: CancellationToken, op: F) -> Result<T>
where
F: for<'a> FnOnce(&'a BcCamera) -> BoxFuture<'a, Result<T>>,
{
let opts = build_bc_opts(cfg);
let max_enc = max_encryption(cfg);
tokio::select! {
_ = cancel.cancelled() => Err(InterruptedError::new().into()),
res = async move {
let camera = timeout(CONNECT_TIMEOUT, BcCamera::new(&opts))
.await
.context("connect timed out")?
.context("connect failed")?;
timeout(LOGIN_TIMEOUT, camera.login_with_maxenc(max_enc))
.await
.context("login timed out")?
.context("login failed")?;
let op_result: Result<T> = match timeout(OP_TIMEOUT, op(&camera)).await {
Ok(inner) => inner,
Err(_) => Err(anyhow!("operation timed out")),
};
let _ = timeout(LOGOUT_TIMEOUT, camera.logout()).await;
op_result
} => res,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::test_helpers::minimal_camera_config;
#[tokio::test]
async fn pre_cancelled_token_short_circuits_with_interrupted_error() {
let cfg = minimal_camera_config("cancel-test");
let cancel = CancellationToken::new();
cancel.cancel();
let op_ran = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let op_ran_inner = op_ran.clone();
let result: Result<()> = run(&cfg, cancel, move |_cam| {
let op_ran_inner = op_ran_inner.clone();
Box::pin(async move {
op_ran_inner.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(())
})
})
.await;
let err = result.expect_err("pre-cancel must error");
assert!(
err.downcast_ref::<InterruptedError>().is_some(),
"expected InterruptedError, got: {err:#}"
);
assert!(
!op_ran.load(std::sync::atomic::Ordering::Relaxed),
"op closure must not run when cancel fires first"
);
}
}