use bytes::Bytes;
use url::Url;
use crate::rtsp::msg::{self, OwnedMessage, StatusCode};
use super::{ResponseMode, RtspConnection, SessionOptions, Tool};
use crate::{Error, error::ErrorInt};
const EXISTING_CONN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
const FRESH_CONN_INITIAL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(1);
const FRESH_CONN_MAX_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(16);
#[allow(clippy::too_many_arguments)]
pub(super) async fn background_teardown(
seqnum: Option<u64>,
base_url: Url,
tool: Option<Tool>,
session_id: Box<str>,
just_try_once: bool,
options: SessionOptions,
requested_auth: Option<http_auth::PasswordClient>,
conn: Option<RtspConnection>,
mut tx: tokio::sync::watch::Sender<Option<Result<(), Error>>>,
expires: tokio::time::Instant,
) {
log::debug!(
"TEARDOWN {} starting for URL {}",
&*session_id,
base_url.as_str(),
);
if tokio::time::timeout_at(
expires,
teardown_loop_forever(
base_url,
tool,
&session_id,
just_try_once,
&options,
requested_auth,
conn,
&mut tx,
),
)
.await
.is_err()
{
log::debug!("TEARDOWN {} aborted on session expiration", &*session_id);
}
if let Some(ref session_group) = options.session_group {
let seqnum = seqnum.expect("seqnum specified when session_group exists");
log::trace!(
"Clearing session {:?}/{} for id {:?}",
session_group.debug_id(),
seqnum,
&*session_id
);
if !session_group.try_remove_seqnum(seqnum) {
log::warn!(
"Unable to find session {:?}/{} for id {:?} on TEARDOWN",
session_group.debug_id(),
seqnum,
&*session_id
);
}
}
let _ = tx.send(Some(Ok(())));
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn teardown_loop_forever(
url: Url,
tool: Option<Tool>,
session_id: &str,
just_try_once: bool,
options: &SessionOptions,
mut requested_auth: Option<http_auth::PasswordClient>,
mut conn: Option<RtspConnection>,
tx: &mut tokio::sync::watch::Sender<Option<Result<(), Error>>>,
) {
let mut req = OwnedMessage::Request {
head: msg::Request {
method: msg::Method::TEARDOWN,
request_uri: Some(url.clone()),
headers: [(
msg::HeaderName::SESSION,
msg::HeaderValue::try_from(session_id.to_string()).unwrap(),
)]
.into(),
},
body: Bytes::new(),
};
let attempt_deadline = tokio::time::sleep(EXISTING_CONN_TIMEOUT);
tokio::pin!(attempt_deadline);
if let Some(conn) = conn.take() {
tokio::select! {
biased;
r = attempt(&mut req, tool.as_ref(), options, &mut requested_auth, conn) => {
match r {
Ok(status) => {
log::debug!("TEARDOWN {} on existing conn succeeded (status {}).", session_id, u16::from(status));
return
},
Err(e) => {
log::debug!("TEARDOWN {} on existing conn failed: {}", session_id, &e);
},
}
},
_ = &mut attempt_deadline => log::debug!("TEARDOWN {} on existing conn timed out", session_id),
}
};
if just_try_once {
log::debug!(
"Giving up on TEARDOWN {}; use TearDownPolicy::Always to try harder",
session_id
);
return;
}
let mut timeout = FRESH_CONN_INITIAL_TIMEOUT;
for attempt_num in 1.. {
attempt_deadline
.as_mut()
.reset(tokio::time::Instant::now() + timeout);
let attempt = async {
let conn = RtspConnection::connect(&url).await?;
attempt(&mut req, tool.as_ref(), options, &mut requested_auth, conn).await
};
tokio::select! {
biased;
r = attempt => {
match r {
Ok(status) => {
log::debug!("TEARDOWN {} fresh connection attempt {} succeeded (status {}).", session_id, attempt_num, u16::from(status));
return
},
Err(e) => {
log::debug!("TEARDOWN {} fresh connection attempt {} failed: {}", session_id, attempt_num, &e);
let _ = tx.send(Some(Err(e)));
attempt_deadline.as_mut().await;
},
}
},
_ = &mut attempt_deadline => {
log::debug!("TEARDOWN {} fresh connection attempt {} timed out", session_id, attempt_num);
let _ = tx.send(Some(Err(wrap!(ErrorInt::Timeout))));
},
}
timeout = std::cmp::min(timeout * 2, FRESH_CONN_MAX_TIMEOUT);
}
}
async fn attempt(
req: &mut OwnedMessage,
tool: Option<&Tool>,
options: &SessionOptions,
requested_auth: &mut Option<http_auth::PasswordClient>,
mut conn: RtspConnection,
) -> Result<StatusCode, Error> {
let e = match conn
.send(ResponseMode::Teardown, options, tool, requested_auth, req)
.await
{
Ok((_ctx, _cseq, resp, _body)) => return Ok(resp.status_code),
Err(e) => e,
};
match *e.0 {
ErrorInt::RtspResponseError { status, .. }
if status == StatusCode::SESSION_NOT_FOUND ||
status == StatusCode::INTERNAL_SERVER_ERROR =>
{
Ok(status)
}
_ => Err(e),
}
}