use crate::ogre_std::{
ogre_queues::{
full_sync::full_sync_move::FullSyncMove,
meta_container::MoveContainer,
meta_publisher::MovePublisher,
meta_subscriber::MoveSubscriber,
},
ogre_sync,
};
use std::{
time::Duration,
sync::atomic::{AtomicU32, AtomicBool, Ordering::Relaxed},
pin::Pin,
task::Waker,
};
use std::cell::UnsafeCell;
use minstant::Instant;
pub struct StreamsManagerBase<const MAX_STREAMS: usize> {
vacant_streams: FullSyncMove<u32, MAX_STREAMS>,
used_streams: UnsafeCell<Pin<Box<[u32; MAX_STREAMS]>>>,
used_streams_count: AtomicU32,
streams_lock: AtomicBool,
created_streams_count: AtomicU32,
finished_streams_count: AtomicU32,
wakers_lock: AtomicBool,
wakers: UnsafeCell<Pin<Box<[Option<Waker>; MAX_STREAMS]>>>,
keep_streams_running: UnsafeCell<Pin<Box<[bool; MAX_STREAMS]>>>,
streams_manager_name: String,
}
impl<const MAX_STREAMS: usize>
StreamsManagerBase<MAX_STREAMS> {
pub fn new<IntoString: Into<String>>(streams_manager_name: IntoString) -> Self {
Self {
vacant_streams: {
let vacant_streams = FullSyncMove::<u32, MAX_STREAMS>::new();
for stream_id in 0..MAX_STREAMS as u32 {
vacant_streams.publish_movable(stream_id);
}
vacant_streams
},
used_streams: UnsafeCell::new(Box::pin([u32::MAX; MAX_STREAMS])),
used_streams_count: AtomicU32::new(0),
created_streams_count: AtomicU32::new(0),
finished_streams_count: AtomicU32::new(0),
wakers: UnsafeCell::new(Box::pin((0..MAX_STREAMS).map(|_| Option::<Waker>::None).collect::<Vec<_>>().try_into().unwrap())),
wakers_lock: AtomicBool::new(false),
keep_streams_running: UnsafeCell::new(Box::pin([false; MAX_STREAMS])),
streams_lock: AtomicBool::new(false),
streams_manager_name: streams_manager_name.into(),
}
}
pub fn name(&self) -> &str {
self.streams_manager_name.as_str()
}
pub fn create_stream_id(&self) -> u32 {
self.created_streams_count.fetch_add(1, Relaxed);
self.used_streams_count.fetch_add(1, Relaxed);
let stream_id = match self.vacant_streams.consume_movable() {
Some(stream_id) => stream_id,
None => panic!("StreamsManager: '{}' has a MAX_STREAMS of {MAX_STREAMS} -- which just got exhausted: stats: {} streams were created; {} dropped. Please, increase the limit or fix the LOGIC BUG!",
self.streams_manager_name, self.created_streams_count.load(Relaxed), self.finished_streams_count.load(Relaxed)),
};
let keep_streams_running = unsafe { &mut * self.keep_streams_running.get() };
keep_streams_running[stream_id as usize] = true;
self.sync_vacant_and_used_streams();
stream_id
}
#[inline(always)]
pub fn wake_stream(&self, stream_id: u32) {
let wakers = unsafe { &* self.wakers.get() };
match unsafe {wakers.get_unchecked(stream_id as usize)} {
Some(waker) => waker.wake_by_ref(),
None => {
ogre_sync::lock(&self.wakers_lock);
if let Some(waker) = unsafe {wakers.get_unchecked(stream_id as usize)} {
waker.wake_by_ref();
}
ogre_sync::unlock(&self.wakers_lock);
}
}
}
pub fn wake_all_streams(&self) {
for stream_id in 0..MAX_STREAMS as u32 {
if stream_id == u32::MAX {
break
}
self.wake_stream(stream_id);
}
}
#[inline(always)]
pub fn keep_stream_running(&self, stream_id: u32) -> bool {
unsafe {
let keep_streams_running = &* self.keep_streams_running.get();
*keep_streams_running.get_unchecked(stream_id as usize)
}
}
#[inline(always)]
pub fn is_any_stream_running(&self) -> bool {
for stream_id in 0..MAX_STREAMS as u32 {
if stream_id == u32::MAX {
break
}
if self.keep_stream_running(stream_id) {
return true
}
}
false
}
pub fn cancel_stream(&self, stream_id: u32) {
let keep_streams_running = unsafe { &mut * self.keep_streams_running.get() };
keep_streams_running[stream_id as usize] = false;
self.wake_stream(stream_id);
}
pub fn cancel_all_streams(&self) {
let used_streams = unsafe { &* self.used_streams.get() };
for stream_id in used_streams.iter() {
if *stream_id == u32::MAX {
break
}
self.cancel_stream(*stream_id);
}
}
#[inline(always)]
pub fn register_stream_waker(&self, stream_id: u32, waker: &Waker) {
let wakers = unsafe { &mut * self.wakers.get() };
macro_rules! set {
() => {
let waker = waker.clone();
ogre_sync::lock(&self.wakers_lock);
let waker = unsafe { wakers.get_unchecked_mut(stream_id as usize).insert(waker) };
ogre_sync::unlock(&self.wakers_lock);
waker.wake_by_ref();
}
}
match unsafe { wakers.get_unchecked_mut(stream_id as usize) } {
Some(registered_waker) => {
if !registered_waker.will_wake(waker) {
set!();
}
},
None => {
set!();
},
}
}
pub fn report_stream_dropped(&self, stream_id: u32) {
let wakers = unsafe { &mut * self.wakers.get() };
ogre_sync::lock(&self.wakers_lock);
wakers[stream_id as usize] = None;
ogre_sync::unlock(&self.wakers_lock);
self.finished_streams_count.fetch_add(1, Relaxed);
self.used_streams_count.fetch_sub(1, Relaxed);
self.vacant_streams.publish_movable(stream_id);
self.sync_vacant_and_used_streams();
}
#[inline(always)]
pub fn used_streams(&self) -> &[u32; MAX_STREAMS] {
unsafe { &* self.used_streams.get() }
}
#[inline(always)]
fn sync_vacant_and_used_streams(&self) {
let used_streams = unsafe { &mut * self.used_streams.get() };
ogre_sync::lock(&self.streams_lock);
let mut vacant = unsafe { self.vacant_streams.peek_remaining().concat() };
vacant.sort_unstable();
let mut vacant_iter = vacant.iter();
let mut i = 0;
let mut last_used_stream_id = -1;
while i < MAX_STREAMS as u32 {
match vacant_iter.next() {
Some(next_vacant_stream_id) => {
for used_stream_id in i .. *next_vacant_stream_id {
last_used_stream_id += 1;
unsafe { *used_streams.get_unchecked_mut(last_used_stream_id as usize) = used_stream_id };
}
i = *next_vacant_stream_id + 1;
}
None => {
last_used_stream_id += 1;
unsafe { *used_streams.get_unchecked_mut(last_used_stream_id as usize) = i };
i += 1;
}
}
}
for i in (last_used_stream_id + 1) as usize .. MAX_STREAMS {
unsafe { *used_streams.get_unchecked_mut(i) = u32::MAX };
}
ogre_sync::unlock(&self.streams_lock);
}
pub async fn flush(&self, timeout: Duration, pending_items_counter: impl Fn() -> u32) -> u32 {
let mut start: Option<Instant> = None;
loop {
let pending_items_count = pending_items_counter();
if pending_items_count > 0 {
self.wake_all_streams();
tokio::time::sleep(Duration::from_millis(1)).await;
} else {
break 0
}
if timeout != Duration::ZERO {
if let Some(start) = start {
if start.elapsed() > timeout {
break pending_items_count
}
} else {
start = Some(Instant::now());
}
}
}
}
pub async fn end_stream(&self, stream_id: u32, timeout: Duration, pending_items_counter: impl Fn() -> u32) -> bool {
let is_vacant = || unsafe { self.vacant_streams.peek_remaining().iter() }
.flat_map(|&slice| slice)
.any(|vacant_stream_id| *vacant_stream_id == stream_id);
debug_assert!(!is_vacant(), "Mutiny's `StreamsManager` @ end_stream(): BUG! stream_id {stream_id} is not running! Running ones are {:?}",
unsafe { &*self.used_streams.get() } .iter().filter(|&id| *id != u32::MAX).collect::<Vec<&u32>>());
let start = Instant::now();
self.flush(timeout, pending_items_counter).await;
self.cancel_stream(stream_id);
loop {
self.wake_stream(stream_id);
tokio::time::sleep(Duration::from_millis(1)).await;
if is_vacant() {
break true
} else if timeout != Duration::ZERO && start.elapsed() > timeout {
break false
}
}
}
pub async fn end_all_streams(&self, timeout: Duration, pending_items_counter: impl Fn() -> u32) -> u32 {
let start = Instant::now();
if self.flush(timeout, &pending_items_counter).await > 0 {
self.cancel_all_streams();
self.flush(timeout, &pending_items_counter).await;
}
self.cancel_all_streams();
while self.running_streams_count() > 0 {
if timeout != Duration::ZERO && start.elapsed() > timeout {
break
}
tokio::time::sleep(Duration::from_millis(1)).await;
}
self.running_streams_count()
}
#[inline(always)]
pub fn running_streams_count(&self) -> u32 {
self.used_streams_count.load(Relaxed)
}
}
unsafe impl<const MAX_STREAMS: usize>
Send for
StreamsManagerBase<MAX_STREAMS> {}
unsafe impl<const MAX_STREAMS: usize>
Sync for
StreamsManagerBase<MAX_STREAMS> {}