use std::ffi::CString;
use lazy_static::lazy_static;
use crate::{
concurrent::{atomic_buffer::AtomicBuffer, reports::loss_report_descriptor::LossReportEntryDefn},
offset_of,
utils::{
bit_utils,
misc::CACHE_LINE_LENGTH,
types::{Index, I32_SIZE},
},
};
pub mod loss_report_descriptor {
use crate::utils::types::Index;
#[repr(C, packed(4))]
#[derive(Copy, Clone)]
pub struct LossReportEntryDefn {
pub observation_count: i64,
pub total_bytes_lost: i64,
pub first_observation_timestamp: i64,
pub last_observation_timestamp: i64,
pub session_id: i32,
pub stream_id: i32,
}
pub(super) const CHANNEL_OFFSET: Index = std::mem::size_of::<LossReportEntryDefn>() as Index;
const LOSS_REPORT_FILE_NAME: &str = "loss-report.dat";
#[inline]
pub fn file(aeron_directory_name: &str) -> String {
format!("{}/{}", aeron_directory_name, LOSS_REPORT_FILE_NAME)
}
}
lazy_static! {
pub static ref OBSERVATION_COUNT_OFFSET: Index = offset_of!(LossReportEntryDefn, observation_count);
pub static ref ENTRY_ALIGNMENT: Index = std::mem::size_of_val(&CACHE_LINE_LENGTH) as Index;
}
pub type LossConsumerHandler = fn(i64, LossReportEntryDefn, CString , CString );
#[inline]
pub fn read(buffer: &AtomicBuffer, consumer: LossConsumerHandler) -> i32 {
let mut records_read = 0;
let mut offset = 0;
let capacity = buffer.capacity();
while offset < capacity {
let observation_count: i64 = buffer.get_volatile::<i64>(offset + *OBSERVATION_COUNT_OFFSET);
if 0 == observation_count {
break;
}
records_read += 1;
let channel = buffer.get_string(offset + loss_report_descriptor::CHANNEL_OFFSET);
let channel_length = channel.as_bytes().len() as Index;
let source = buffer.get_string(offset + loss_report_descriptor::CHANNEL_OFFSET + I32_SIZE + channel_length);
let source_length = source.as_bytes().len() as Index;
let record = buffer.get::<loss_report_descriptor::LossReportEntryDefn>(offset);
consumer(observation_count, record, channel, source);
let record_length = loss_report_descriptor::CHANNEL_OFFSET + I32_SIZE * 2 + channel_length + source_length;
offset += bit_utils::align(record_length, *ENTRY_ALIGNMENT);
}
records_read
}