use std::{collections::HashMap, ops::Deref, sync::Arc};
use zenoh_core::{bail, zerror};
use zenoh_result::ZResult;
use crate::{
api::{
client::shm_segment::ShmSegment,
client_storage::ShmClientStorage,
common::types::{ProtocolID, SegmentID},
provider::chunk::ChunkDescriptor,
},
metadata::subscription::GLOBAL_METADATA_SUBSCRIPTION,
watchdog::confirmator::GLOBAL_CONFIRMATOR,
ShmBufInfo, ShmBufInner,
};
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct ShmReader {
client_storage: Arc<ShmClientStorage>,
}
impl Deref for ShmReader {
type Target = ShmClientStorage;
fn deref(&self) -> &Self::Target {
&self.client_storage
}
}
impl ShmReader {
pub fn new(client_storage: Arc<ShmClientStorage>) -> Self {
Self { client_storage }
}
pub fn read_shmbuf(&self, info: ShmBufInfo) -> ZResult<ShmBufInner> {
let metadata = GLOBAL_METADATA_SUBSCRIPTION.read().link(&info.metadata)?;
let confirmed_metadata = Arc::new(GLOBAL_CONFIRMATOR.read().add(metadata));
let data_descriptor = confirmed_metadata.owned.header().data_descriptor();
let segment = self.ensure_data_segment(
confirmed_metadata
.owned
.header()
.protocol
.load(std::sync::atomic::Ordering::Relaxed),
&data_descriptor,
)?;
let buf = segment.map(data_descriptor.chunk)?;
let shmb = ShmBufInner {
metadata: confirmed_metadata,
buf,
info: info.clone(),
};
match shmb.is_valid() {
true => Ok(shmb),
false => bail!("Buffer is invalidated"),
}
}
fn ensure_data_segment(
&self,
protocol_id: ProtocolID,
descriptor: &ChunkDescriptor,
) -> ZResult<Arc<dyn ShmSegment>> {
let id = GlobalDataSegmentID::new(protocol_id, descriptor.segment);
let r_guard = self.segments.read().unwrap();
if let Some(val) = r_guard.get(&id) {
return Ok(val.clone());
}
drop(r_guard);
let client = self
.clients
.get_clients()
.get(&id.protocol)
.ok_or_else(|| zerror!("Unsupported SHM protocol: {}", id.protocol))?;
let mut w_guard = self.segments.write().unwrap();
match w_guard.entry(id) {
std::collections::hash_map::Entry::Occupied(occupied) => Ok(occupied.get().clone()),
std::collections::hash_map::Entry::Vacant(vacant) => {
let new_segment = client.attach(descriptor.segment)?;
Ok(vacant.insert(new_segment).clone())
}
}
}
}
#[derive(Debug)]
pub(crate) struct ClientStorage<Inner>
where
Inner: Sized,
{
clients: HashMap<ProtocolID, Inner>,
}
impl<Inner: Sized> ClientStorage<Inner> {
pub(crate) fn new(clients: HashMap<ProtocolID, Inner>) -> Self {
Self { clients }
}
pub(crate) fn get_clients(&self) -> &HashMap<ProtocolID, Inner> {
&self.clients
}
}
unsafe impl<Inner: Send> Send for ClientStorage<Inner> {}
unsafe impl<Inner: Sync> Sync for ClientStorage<Inner> {}
#[derive(Debug, PartialEq, Eq, Hash)]
pub(crate) struct GlobalDataSegmentID {
protocol: ProtocolID,
segment: SegmentID,
}
impl GlobalDataSegmentID {
fn new(protocol: ProtocolID, segment: SegmentID) -> Self {
Self { protocol, segment }
}
}