use crate::{lowlevel::Extensions, SftpAuxiliaryData};
use std::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering};
use once_cell::sync::OnceCell;
use tokio::{runtime::Handle, sync::Notify};
use tokio_util::sync::CancellationToken;
#[derive(Debug, Copy, Clone)]
pub(super) struct Limits {
pub(super) read_len: u32,
pub(super) write_len: u32,
}
#[derive(Debug)]
pub(super) struct ConnInfo {
pub(super) limits: Limits,
pub(super) extensions: Extensions,
}
#[derive(Debug)]
pub(super) struct Auxiliary {
pub(super) conn_info: OnceCell<ConnInfo>,
pub(super) cancel_token: CancellationToken,
pub(super) flush_end_notify: Notify,
pub(super) flush_immediately: Notify,
pub(super) pending_requests: AtomicUsize,
pub(super) max_pending_requests: u16,
pub(super) read_end_notify: Notify,
pub(super) requests_to_read: AtomicUsize,
pub(super) shutdown_stage: AtomicU8,
pub(super) active_user_count: AtomicU64,
pub(super) auxiliary_data: SftpAuxiliaryData,
pub(super) tokio_compat_file_write_limit: usize,
pub(super) tokio_handle: Handle,
}
impl Auxiliary {
pub(super) fn new(
max_pending_requests: u16,
auxiliary_data: SftpAuxiliaryData,
tokio_compat_file_write_limit: usize,
tokio_handle: Handle,
) -> Self {
Self {
conn_info: OnceCell::new(),
cancel_token: CancellationToken::new(),
flush_end_notify: Notify::new(),
flush_immediately: Notify::new(),
pending_requests: AtomicUsize::new(0),
max_pending_requests,
read_end_notify: Notify::new(),
requests_to_read: AtomicUsize::new(0),
shutdown_stage: AtomicU8::new(0),
active_user_count: AtomicU64::new(1),
auxiliary_data,
tokio_compat_file_write_limit,
tokio_handle,
}
}
pub(super) fn wakeup_flush_task(&self) {
self.requests_to_read.fetch_add(1, Ordering::Relaxed);
let pending_requests = self.pending_requests.fetch_add(1, Ordering::Relaxed);
self.flush_end_notify.notify_one();
if pending_requests == self.max_pending_requests() {
self.flush_immediately.notify_one();
}
}
fn conn_info(&self) -> &ConnInfo {
self.conn_info
.get()
.expect("auxiliary.conn_info shall be initialized by sftp::Sftp::new")
}
pub(super) fn extensions(&self) -> Extensions {
self.conn_info().extensions
}
pub(super) fn limits(&self) -> Limits {
self.conn_info().limits
}
pub(super) fn max_pending_requests(&self) -> usize {
self.max_pending_requests as usize
}
pub(super) fn order_shutdown(&self) {
self.shutdown_stage.store(1, Ordering::Relaxed);
self.flush_immediately.notify_one();
self.flush_end_notify.notify_one();
}
pub(super) fn trigger_flushing(&self) {
self.flush_immediately.notify_one();
}
pub(super) fn get_pending_requests(&self) -> usize {
self.pending_requests.load(Ordering::Relaxed)
}
pub(super) fn inc_active_user_count(&self) {
self.active_user_count.fetch_add(1, Ordering::Relaxed);
}
pub(super) fn dec_active_user_count(&self) {
if self.active_user_count.fetch_sub(1, Ordering::Relaxed) == 1 {
self.order_shutdown()
}
}
pub(super) fn tokio_compat_file_write_limit(&self) -> usize {
self.tokio_compat_file_write_limit
}
pub(super) fn tokio_handle(&self) -> &Handle {
&self.tokio_handle
}
}