use std::collections::BTreeMap;
use std::os::fd::AsRawFd;
use std::sync::{Arc, Condvar, Mutex};
use libc::{MAP_FAILED, MAP_FIXED, MAP_SHARED, PROT_READ, PROT_WRITE};
use libc::{c_uchar, c_void, size_t};
use log::error;
use crate::stream::{Tag, TagPos};
use crate::{Error, Result};
const SYNC_SLEEP_TIME: std::time::Duration = std::time::Duration::from_millis(100);
#[cfg(feature = "async")]
const ASYNC_SLEEP_TIME: tokio::time::Duration = tokio::time::Duration::from_millis(100);
#[derive(Debug)]
struct Map {
base: *mut c_uchar,
len: usize,
}
impl Map {
fn new(f: &std::fs::File, len: usize) -> Result<Self> {
Self::with_addr(f, len, std::ptr::null_mut())
}
fn with_addr(f: &std::fs::File, len: usize, ptr: *mut c_void) -> Result<Self> {
let fd = f.as_raw_fd();
let flags = MAP_SHARED | if ptr.is_null() { 0 } else { MAP_FIXED };
let buf = unsafe { libc::mmap(ptr, len as size_t, PROT_READ | PROT_WRITE, flags, fd, 0) };
if std::ptr::eq(buf, MAP_FAILED) {
let e = std::io::Error::last_os_error();
return Err(Error::msg(format!(
"mmap(){}: {e}",
if ptr.is_null() {
""
} else {
" at fixed address"
}
)));
}
assert!(!buf.is_null());
if !ptr.is_null() && !std::ptr::eq(ptr, buf) {
let rc = unsafe { libc::munmap(buf, len as size_t) };
if rc != 0 {
let e = std::io::Error::last_os_error();
panic!("Failed to unmap buffer just mapped in the failure path: {e}");
}
return Err(Error::msg("mmap() allocated in the wrong place"));
}
Ok(Self {
base: buf.cast::<c_uchar>(),
len,
})
}
}
impl Drop for Map {
fn drop(&mut self) {
let rc = unsafe { libc::munmap(self.base.cast::<c_void>(), self.len) };
if rc != 0 {
let e = std::io::Error::last_os_error();
panic!("munmap() failed on circular buffer: {e}");
}
}
}
#[derive(Debug)]
pub struct Circ {
len: usize,
map: Map,
_map2: Map, }
impl Circ {
fn new(size: usize) -> Result<Self> {
fn set_len(f: &std::fs::File, size: usize) -> Result<()> {
f.set_len(size as u64)
.map_err(|e| Error::wrap(e, format!("circular buffer temp file set_len({size})")))
}
let size_x2 = size * 2;
let f = tempfile::tempfile().map_err(|e| Error::file_io(e, std::env::temp_dir()))?;
set_len(&f, size_x2)?;
let mut map = Map::new(&f, size_x2)?;
let second = (map.base as libc::uintptr_t + size as libc::uintptr_t) as *mut c_void;
let map2 = Map::with_addr(&f, size, second)?;
map.len = size;
set_len(&f, size)?;
Ok(Self {
len: size_x2,
map,
_map2: map2,
})
}
#[must_use]
pub fn total_size(&self) -> usize {
self.len / 2
}
#[allow(clippy::mut_from_ref)]
#[must_use]
fn full_buffer<T>(&self, start: usize, end: usize) -> &mut [T] {
let ez = std::mem::size_of::<T>();
debug_assert!(self.len.is_multiple_of(ez));
debug_assert!(
end - start <= self.len / ez / 2,
"requested {start} to {end} ({} entries) of {} but len is {}",
end - start,
ez,
self.len
);
let buf =
unsafe { std::slice::from_raw_parts_mut(self.map.base.cast::<T>(), self.len / ez) };
&mut buf[start..end]
}
}
unsafe impl Send for Circ {}
unsafe impl Sync for Circ {}
#[derive(Debug)]
struct BufferState {
rpos: usize, wpos: usize, used: usize, circ_len: usize, member_size: usize, tags: BTreeMap<TagPos, Vec<Tag>>,
}
impl BufferState {
#[must_use]
fn write_range(&self) -> (usize, usize) {
(self.wpos, self.wpos + self.free())
}
#[must_use]
fn read_range(&self) -> (usize, usize) {
(self.rpos, self.rpos + self.used)
}
#[must_use]
fn capacity(&self) -> usize {
self.circ_len / self.member_size
}
#[must_use]
fn write_capacity(&self) -> usize {
let (a, b) = self.write_range();
b - a
}
#[must_use]
fn free(&self) -> usize {
self.capacity() - self.used
}
}
pub struct BufferReader<T> {
parent: Arc<Buffer<T>>,
start: usize,
end: usize,
}
impl<T: Copy> BufferReader<T> {
#[must_use]
fn new(parent: Arc<Buffer<T>>, start: usize, end: usize) -> Self {
Self { parent, start, end }
}
#[must_use]
pub fn slice(&self) -> &[T] {
self.parent.slice(self.start, self.end)
}
pub fn iter(&self) -> std::slice::Iter<'_, T> {
self.slice().iter()
}
pub fn consume(self, n: usize) {
self.parent.consume(n);
}
#[must_use]
pub fn len(&self) -> usize {
self.end - self.start
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.end == self.start
}
}
impl<T: Copy> std::ops::Index<usize> for BufferReader<T> {
type Output = T;
fn index(&self, n: usize) -> &Self::Output {
&self.slice()[n]
}
}
pub struct BufferWriter<T> {
parent: Arc<Buffer<T>>,
start: usize,
end: usize,
}
impl<T: Copy> BufferWriter<T> {
#[must_use]
fn new(parent: Arc<Buffer<T>>, start: usize, end: usize) -> BufferWriter<T> {
assert!(end >= start);
Self { parent, start, end }
}
#[must_use]
pub fn slice(&mut self) -> &mut [T] {
self.parent.slice_mut(self.start, self.end)
}
pub fn fill_from_iter(&mut self, src: impl IntoIterator<Item = T>) {
for (place, item) in self.slice().iter_mut().zip(src) {
*place = item;
}
}
pub fn fill_from_slice(&mut self, src: &[T]) {
self.slice()[..src.len()].copy_from_slice(src);
}
pub fn produce(self, n: usize, tags: &[Tag]) {
self.parent.produce(n, tags);
}
#[must_use]
pub fn len(&self) -> usize {
self.end - self.start
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.end == self.start
}
}
#[derive(Debug)]
struct BufferInner {
lock: Mutex<BufferState>,
cv: Condvar,
#[cfg(feature = "async")]
acvr: tokio::sync::Notify,
#[cfg(feature = "async")]
acvw: tokio::sync::Notify,
}
#[derive(Debug)]
pub struct Buffer<T> {
id: usize,
state: Arc<BufferInner>,
circ: Circ,
member_size: usize,
dummy: std::marker::PhantomData<T>,
}
impl<T> Buffer<T> {
pub fn new(size: usize) -> Result<Self> {
Ok(Self {
id: crate::NEXT_STREAM_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
state: Arc::new(BufferInner {
lock: Mutex::new(BufferState {
rpos: 0,
wpos: 0,
used: 0,
circ_len: size,
member_size: std::mem::size_of::<T>(),
tags: BTreeMap::new(),
}),
cv: Condvar::new(),
#[cfg(feature = "async")]
acvr: tokio::sync::Notify::new(),
#[cfg(feature = "async")]
acvw: tokio::sync::Notify::new(),
}),
member_size: std::mem::size_of::<T>(),
circ: Circ::new(size)?,
dummy: std::marker::PhantomData,
})
}
#[must_use]
pub(crate) fn id(&self) -> usize {
self.id
}
#[must_use]
pub fn total_size(&self) -> usize {
self.circ.total_size() / self.member_size
}
#[must_use]
pub fn free(&self) -> usize {
self.state.lock.lock().unwrap().free()
}
#[must_use]
pub fn wait_for_write(&self, need: usize) -> usize {
self.state
.cv
.wait_timeout_while(self.state.lock.lock().unwrap(), SYNC_SLEEP_TIME, |s| {
s.free() < need
})
.unwrap()
.0
.free()
}
#[cfg(feature = "async")]
pub async fn wait_for_write_async(&self, _need: usize) -> usize {
let sleep = tokio::time::sleep(ASYNC_SLEEP_TIME);
tokio::select! {
_ = sleep => 0,
_ = self.state.acvw.notified() => 1,
}
}
#[must_use]
pub fn wait_for_read(&self, need: usize) -> usize {
self.state
.cv
.wait_timeout_while(self.state.lock.lock().unwrap(), SYNC_SLEEP_TIME, |s| {
s.used < need
})
.unwrap()
.0
.used
}
#[cfg(feature = "async")]
pub async fn wait_for_read_async(&self, _need: usize) -> usize {
let sleep = tokio::time::sleep(ASYNC_SLEEP_TIME);
tokio::select! {
_ = sleep => 0,
_ = self.state.acvr.notified() => 1,
}
}
}
impl<T> Buffer<T> {
#[must_use]
pub(crate) fn is_empty(&self) -> bool {
let state = self.state.lock.lock().unwrap();
state.used == 0
}
}
impl<T: Copy> Buffer<T> {
pub(in crate::nowasm::circular_buffer) fn consume(&self, n: usize) {
use std::ops::Bound::{Excluded, Included};
let mut s = self.state.lock.lock().unwrap();
assert!(
n <= s.used,
"trying to consume {}, but only have {}",
n,
s.used
);
let newpos = (s.rpos + n) % s.capacity();
let keys: Vec<TagPos> = if newpos > s.rpos {
s.tags
.range((Included(s.rpos), Excluded(newpos)))
.map(|(k, _)| *k)
.collect()
} else {
let mut t: Vec<TagPos> = s
.tags
.range((Included(s.rpos), Excluded(s.capacity())))
.map(|(k, _)| *k)
.collect();
t.extend(
s.tags
.range((Included(0), Excluded(newpos)))
.map(|(k, _)| *k),
);
t
};
for k in keys {
s.tags.remove(&k);
}
s.rpos = newpos;
s.used -= n;
self.state.cv.notify_all();
#[cfg(feature = "async")]
self.state.acvw.notify_one();
}
pub(in crate::nowasm::circular_buffer) fn produce(&self, n: usize, tags: &[Tag]) {
if n == 0 {
debug_assert!(tags.is_empty());
if !tags.is_empty() {
error!("produce() called on a stream with 0 entries, but non-empty tags: {tags:?}");
}
return;
}
let mut s = self.state.lock.lock().unwrap();
assert!(
s.free() >= n,
"tried to produce {n}, but only {} is free out of {}",
s.free(),
self.total_size()
);
assert!(
s.write_capacity() >= n,
"can't produce that much. {} < {}",
s.write_capacity(),
n
);
for tag in tags {
let pos = (tag.pos() + s.wpos) % s.capacity();
let tag = Tag::new(pos, tag.key(), tag.val().clone());
s.tags.entry(pos).or_default().push(tag);
}
s.wpos = (s.wpos + n) % s.capacity();
s.used += n;
self.state.cv.notify_all();
#[cfg(feature = "async")]
self.state.acvr.notify_waiters();
}
#[must_use]
pub(crate) fn slice(&self, start: usize, end: usize) -> &[T] {
self.circ.full_buffer::<T>(start, end)
}
#[must_use]
pub(crate) fn slice_mut(&self, start: usize, end: usize) -> &mut [T] {
self.circ.full_buffer::<T>(start, end)
}
pub fn read_buf(self: Arc<Self>) -> Result<(BufferReader<T>, Vec<Tag>)> {
let s = self.state.lock.lock().unwrap();
let (start, end) = s.read_range();
let mut tags = Vec::with_capacity(s.tags.len());
for (n, ts) in &s.tags {
let modded_n: usize = *n % s.capacity();
if end < s.capacity() && start < s.capacity() {
if modded_n < start || modded_n > end {
continue;
}
} else {
assert!(start < s.capacity());
if modded_n > (end % s.capacity()) && modded_n < start {
continue;
}
}
for tag in ts {
tags.push(Tag::new(
(tag.pos() + s.capacity() - start) % s.capacity(),
tag.key(),
tag.val().clone(),
));
}
}
drop(s);
tags.sort_by_key(Tag::pos);
Ok((BufferReader::new(self, start, end), tags))
}
pub fn write_buf(self: Arc<Self>) -> Result<BufferWriter<T>> {
let s = self.state.lock.lock().unwrap();
let (start, end) = s.write_range();
drop(s);
Ok(BufferWriter::new(
self, start, end,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Float;
use crate::stream::TagValue;
#[test]
fn map_empty_file() -> Result<()> {
let f = std::fs::File::open("/dev/null")?;
assert!(Map::new(&f, 1024).is_err());
Ok(())
}
#[test]
fn circ_reqlen() -> Result<()> {
let circ = Circ::new(4096)?;
assert_eq!(circ.total_size(), 4096);
assert!(circ.full_buffer::<u32>(0, 0).is_empty());
assert_eq!(circ.full_buffer::<u32>(0, 1).len(), 1);
assert_eq!(circ.full_buffer::<u32>(0, 1024).len(), 1024);
assert_eq!(circ.full_buffer::<u32>(1000, 1200).len(), 200);
assert_eq!(circ.full_buffer::<u32>(2040, 2048).len(), 8);
Ok(())
}
#[test]
#[should_panic]
fn circ_reqlen_too_big() {
let _ = Circ::new(usize::MAX);
}
#[test]
fn circ_reqlen_too_big_half() {
assert!(Circ::new(usize::MAX >> 1).is_err());
}
#[test]
#[should_panic]
fn circ_reqlen_too_big_beginning() {
if let Ok(circ) = Circ::new(4096) {
let _ = circ.full_buffer::<u32>(0, 1025);
}
}
#[test]
#[should_panic]
fn circ_reqlen_too_big_middle() {
if let Ok(circ) = Circ::new(4096) {
let _ = circ.full_buffer::<u32>(10, 1024 + 11);
}
}
#[test]
#[should_panic]
fn circ_past_end() {
if let Ok(circ) = Circ::new(4096) {
let _ = circ.full_buffer::<u32>(2040, 2049);
}
}
#[test]
fn circ_circular() -> Result<()> {
let circ = Circ::new(4096)?;
assert_eq!(circ.total_size(), 4096);
let buf = circ.full_buffer::<u32>(0, 4);
let buf2 = circ.full_buffer::<u32>(1024, 1028);
assert_eq!(buf, [0, 0, 0, 0]);
assert_eq!(buf2, [0, 0, 0, 0]);
buf[0] = 3;
buf[1] = 2;
buf[2] = 1;
buf[3] = 42;
std::sync::atomic::compiler_fence(std::sync::atomic::Ordering::SeqCst);
assert_eq!(buf, [3, 2, 1, 42]);
assert_eq!(buf2, [3, 2, 1, 42]);
assert_ne!(buf.as_ptr(), buf2.as_ptr());
Ok(())
}
#[test]
fn typical() -> Result<()> {
let b = Arc::new(Buffer::new(4096)?);
assert!(b.clone().read_buf()?.0.is_empty());
assert_eq!(b.clone().write_buf()?.len(), 4096);
{
let mut buf = b.clone().write_buf()?;
buf.slice()[0] = 123;
buf.produce(1, &[Tag::new(0, "start", TagValue::Bool(true))]);
assert_eq!(b.clone().read_buf()?.0.slice(), vec![123]);
assert_eq!(
b.clone().read_buf()?.1,
vec![Tag::new(0, "start", TagValue::Bool(true))]
);
assert_eq!(b.clone().write_buf()?.len(), 4095);
}
b.consume(1);
assert!(b.clone().read_buf()?.0.is_empty());
assert!(b.clone().read_buf()?.1.is_empty());
assert_eq!(b.clone().write_buf()?.len(), 4096);
{
let n = 4000;
let mut wb = b.clone().write_buf()?;
for i in 0..n {
wb.slice()[i] = (i & 0xff) as u8;
}
wb.produce(n, &[Tag::new(1, "foo", TagValue::String("bar".into()))]);
let (rb, rt) = b.clone().read_buf()?;
assert_eq!(rb.len(), n);
for i in 0..n {
assert_eq!(rb.slice()[i], (i & 0xff) as u8);
}
assert_eq!(rt, vec![Tag::new(1, "foo", TagValue::String("bar".into()))]);
assert_eq!(b.clone().write_buf()?.len(), 4096 - n);
}
b.consume(4000);
{
let n = 100;
let mut wb = b.clone().write_buf()?;
for i in 0..n {
wb.slice()[i] = ((n - i) & 0xff) as u8;
}
wb.produce(
n,
&[
Tag::new(0, "first", TagValue::Bool(true)),
Tag::new(99, "last", TagValue::Bool(false)),
],
);
let (rb, rt) = b.clone().read_buf()?;
assert_eq!(rb.len(), n);
for i in 0..n {
assert_eq!(rb.slice()[i], ((n - i) & 0xff) as u8);
}
assert_eq!(
rt,
vec![
Tag::new(0, "first", TagValue::Bool(true)),
Tag::new(99, "last", TagValue::Bool(false))
]
);
drop(rb);
assert_eq!(b.clone().read_buf()?.0.len(), 100);
assert_eq!(b.clone().write_buf()?.len(), 3996);
}
{
let (rb, _) = b.clone().read_buf()?;
let n = rb.len();
rb.consume(n);
assert_eq!(b.clone().read_buf()?.0.len(), 0);
assert!(b.clone().read_buf()?.1.is_empty());
assert_eq!(b.clone().write_buf()?.len(), 4096);
}
Ok(())
}
#[test]
fn two_writes() -> Result<()> {
let b: Arc<Buffer<u8>> = Arc::new(Buffer::new(4096)?);
{
let mut buf = b.clone().write_buf()?;
buf.slice()[1] = 123;
buf.produce(10, &[Tag::new(1, "first", TagValue::Bool(true))]);
assert_eq!(
b.clone().read_buf()?.0.slice(),
vec![0, 123, 0, 0, 0, 0, 0, 0, 0, 0]
);
assert_eq!(
b.clone().read_buf()?.1,
vec![Tag::new(1, "first", TagValue::Bool(true))]
);
assert_eq!(b.clone().write_buf()?.len(), 4086);
}
{
let mut buf = b.clone().write_buf()?;
buf.slice()[2] = 42;
buf.produce(5, &[Tag::new(2, "second", TagValue::Bool(false))]);
assert_eq!(
b.clone().read_buf()?.0.slice(),
vec![0, 123, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 42, 0, 0]
);
assert_eq!(
b.clone().read_buf()?.1,
vec![
Tag::new(1, "first", TagValue::Bool(true)),
Tag::new(12, "second", TagValue::Bool(false))
]
);
assert_eq!(b.clone().write_buf()?.len(), 4081);
}
b.consume(15);
assert!(b.clone().read_buf()?.0.is_empty());
assert!(b.clone().read_buf()?.1.is_empty());
assert_eq!(b.clone().write_buf()?.len(), 4096);
Ok(())
}
#[test]
fn exact_overflow() -> Result<()> {
let b: Arc<Buffer<u8>> = Arc::new(Buffer::new(4096)?);
assert!(b.clone().read_buf()?.0.is_empty());
assert_eq!(b.clone().write_buf()?.len(), 4096);
b.clone().write_buf()?.produce(4096, &[]);
assert_eq!(b.clone().read_buf()?.0.len(), 4096);
assert_eq!(b.clone().write_buf()?.len(), 0);
b.clone().read_buf()?.0.consume(4096);
assert!(b.clone().read_buf()?.0.is_empty());
assert_eq!(b.clone().write_buf()?.len(), 4096);
Ok(())
}
#[test]
fn with_float() -> Result<()> {
let b: Arc<Buffer<Float>> = Arc::new(Buffer::new(4096)?);
assert!(b.clone().read_buf()?.0.is_empty());
assert_eq!(b.clone().write_buf()?.len(), 1024);
{
let mut wb = b.clone().write_buf()?;
wb.slice()[0] = 123.321;
wb.produce(1, &[]);
}
assert_eq!(b.clone().read_buf()?.0.slice(), vec![123.321]);
assert_eq!(b.clone().write_buf()?.len(), 1023);
b.clone().read_buf()?.0.consume(1);
assert!(b.clone().read_buf()?.0.is_empty());
assert_eq!(b.clone().write_buf()?.len(), 1024);
{
let n = 1000;
let mut wb = b.clone().write_buf()?;
for i in 0..n {
wb.slice()[i] = i as Float;
}
wb.produce(n, &[]);
assert_eq!(b.clone().read_buf()?.0.len(), n);
for i in 0..n {
assert_eq!(b.clone().read_buf()?.0.slice()[i], i as Float);
}
assert_eq!(b.clone().write_buf()?.len(), 24);
}
b.clone().read_buf()?.0.consume(1000);
{
let n = 100;
let mut wb = b.clone().write_buf()?;
for i in 0..n {
wb.slice()[i] = (n - i) as Float;
}
wb.produce(n, &[]);
assert_eq!(b.clone().read_buf()?.0.len(), n);
for i in 0..n {
assert_eq!(b.clone().read_buf()?.0.slice()[i], (n - i) as Float);
}
}
assert_eq!(b.clone().read_buf()?.0.len(), 100);
assert_eq!(b.clone().write_buf()?.len(), 1024 - 100);
Ok(())
}
}