use std::{
any::Any,
borrow::{Borrow, Cow},
collections::VecDeque,
convert::TryInto,
fmt::Debug,
hash::{Hash, Hasher},
ops::{Deref, DerefMut},
sync::{Arc, RwLock, Weak},
};
use zenoh_collections::{IntHashMap, IntHashSet, SingleOrBoxHashSet};
use zenoh_protocol::{
core::{key_expr::keyexpr, ExprId, Region, WireExpr},
network::{
self,
declare::{self, queryable::ext::QueryableInfoType, Declare, DeclareBody, DeclareKeyExpr},
interest::InterestId,
Mapping, RequestId,
},
};
use zenoh_sync::{get_mut_unchecked, Cache, CacheValueType};
use super::{
face::FaceState,
pubsub::SubscriberInfo,
tables::{TablesData, TablesLock},
};
use crate::net::routing::{
dispatcher::{
face::{Face, FaceId},
region::RegionMap,
tables::{RoutingExpr, Tables},
},
interceptor::{InterceptorTrait, InterceptorsChain},
RoutingContext,
};
pub(crate) type NodeId = u16;
pub(crate) const DEFAULT_NODE_ID: NodeId = network::ext::NodeIdType::<0>::DEFAULT.node_id;
#[inline]
pub(crate) const fn node_id_as_source(node_id: NodeId) -> Option<NodeId> {
if node_id != DEFAULT_NODE_ID {
Some(node_id)
} else {
None
}
}
#[derive(Clone, Debug)]
pub(crate) struct Direction {
pub(crate) dst_face: Arc<FaceState>,
pub(crate) wire_expr: WireExpr<'static>,
pub(crate) node_id: NodeId,
}
#[derive(Clone, Debug)]
pub(crate) struct QueryDirection {
pub(crate) dir: Direction,
pub(crate) rid: RequestId,
}
pub(crate) type Route = Vec<Direction>;
#[derive(Clone, Debug)]
pub(crate) struct QueryTargetQabl {
pub(crate) dir: Direction,
pub(crate) info: Option<QueryableInfoType>,
pub(crate) region: Region,
}
impl QueryTargetQabl {
pub(crate) fn new(
ctx: &FaceContext,
expr: &RoutingExpr,
complete: bool,
region: &Region,
) -> Option<Self> {
let qabl = ctx.qabl?;
let wire_expr = expr.get_best_key(ctx.face.id);
Some(Self {
dir: Direction {
dst_face: ctx.face.clone(),
wire_expr: wire_expr.to_owned(),
node_id: DEFAULT_NODE_ID,
},
info: Some(QueryableInfoType {
complete: complete && qabl.complete,
distance: if ctx.face.is_local { 0 } else { 1 },
}),
region: *region,
})
}
}
pub(crate) type QueryTargetQablSet = Vec<QueryTargetQabl>;
pub(crate) struct RouteBuilder<T> {
route: Vec<T>,
faces: IntHashSet<usize>,
}
impl<T> RouteBuilder<T> {
pub(crate) fn new() -> Self {
Self {
route: Vec::new(),
faces: IntHashSet::new(),
}
}
pub(crate) fn insert(&mut self, face_id: FaceId, direction: impl FnOnce() -> T) {
if self.faces.insert(face_id) {
self.route.push(direction());
}
}
pub(crate) fn try_insert(&mut self, face_id: usize, direction: impl FnOnce() -> Option<T>) {
if !self.faces.contains(&face_id) {
if let Some(direction) = direction() {
self.faces.insert(face_id);
self.route.push(direction);
}
}
}
pub(crate) fn build(self) -> Vec<T> {
self.route
}
}
pub(crate) struct InterceptorCache(Cache<Option<Box<dyn Any + Send + Sync>>>);
pub(crate) type InterceptorCacheValueType = CacheValueType<Option<Box<dyn Any + Send + Sync>>>;
impl InterceptorCache {
pub(crate) fn new(value: Option<Box<dyn Any + Send + Sync>>, version: usize) -> Self {
Self(Cache::<Option<Box<dyn Any + Send + Sync>>>::new(
value, version,
))
}
pub(crate) fn empty() -> Self {
InterceptorCache::new(None, 0)
}
#[inline]
fn value(
&self,
interceptor: &InterceptorsChain,
resource: &Resource,
) -> Option<InterceptorCacheValueType> {
self.0
.value(interceptor.version, || {
interceptor.compute_keyexpr_cache(resource.keyexpr()?)
})
.ok()
}
}
pub(crate) struct FaceContext {
pub(crate) face: Arc<FaceState>,
pub(crate) local_expr_id: Option<ExprId>,
pub(crate) remote_expr_id: Option<ExprId>,
pub(crate) subs: Option<SubscriberInfo>,
pub(crate) qabl: Option<QueryableInfoType>,
pub(crate) token: bool,
pub(crate) subscriber_interest_finalized: bool,
pub(crate) queryable_interest_finalized: bool,
pub(crate) in_interceptor_cache: InterceptorCache,
pub(crate) e_interceptor_cache: InterceptorCache,
}
impl FaceContext {
pub(crate) fn new(face: Arc<FaceState>) -> Self {
Self {
face,
local_expr_id: None,
remote_expr_id: None,
subs: None,
qabl: None,
token: false,
subscriber_interest_finalized: false,
queryable_interest_finalized: false,
in_interceptor_cache: InterceptorCache::empty(),
e_interceptor_cache: InterceptorCache::empty(),
}
}
}
pub type RoutesVersion = u64;
pub(crate) struct Routes<T> {
mapping: RegionMap<NodeIdMap<T>>,
version: u64,
}
pub(crate) type NodeIdMap<T> = Vec<Option<T>>;
impl<T> Default for Routes<T> {
fn default() -> Self {
Self {
mapping: RegionMap::default(),
version: 0,
}
}
}
impl<T> Routes<T> {
pub(crate) fn clear(&mut self) {
self.mapping.clear();
}
#[inline]
pub(crate) fn get_route(
&self,
version: RoutesVersion,
region: &Region,
node_id: NodeId,
) -> Option<&T> {
if version != self.version {
return None;
}
self.mapping
.get(region)
.and_then(|rs| rs.get(node_id as usize))
.and_then(|r| r.as_ref())
}
#[inline]
pub(crate) fn set_route(
&mut self,
version: RoutesVersion,
region: &Region,
node_id: NodeId,
route: T,
) {
if self.version != version {
self.clear();
self.version = version;
}
let aux = |routes: &mut NodeIdMap<T>| {
routes.resize_with(node_id as usize + 1, || None);
routes[node_id as usize] = Some(route);
};
if let Some(routes) = self.mapping.get_mut(region) {
aux(routes);
} else {
let mut routes = NodeIdMap::default();
aux(&mut routes);
self.mapping.insert(*region, routes);
}
}
}
pub(crate) fn get_or_set_route<T: Clone>(
routes: &RwLock<Routes<T>>,
version: RoutesVersion,
region: &Region,
node_id: NodeId,
compute_route: impl FnOnce() -> T,
) -> T {
if let Some(route) = routes.read().unwrap().get_route(version, region, node_id) {
return route.clone();
}
let mut routes = routes.write().unwrap();
if let Some(route) = routes.get_route(version, region, node_id) {
return route.clone();
}
let route = compute_route();
routes.set_route(version, region, node_id, route.clone());
route
}
pub(crate) type DataRoutes = Routes<Arc<Route>>;
pub(crate) type QueryRoutes = Routes<Arc<QueryTargetQablSet>>;
pub(crate) struct ResourceContext {
pub(crate) matches: Vec<Weak<Resource>>,
pub(crate) hats: RegionMap<HatResourceContext>,
pub(crate) data_routes: RwLock<DataRoutes>,
#[cfg(feature = "stats")]
pub(crate) stats_keys: zenoh_stats::StatsKeyCache,
}
impl ResourceContext {
pub(crate) fn new(hat: RegionMap<HatResourceContext>) -> ResourceContext {
ResourceContext {
matches: Vec::new(),
hats: hat,
data_routes: Default::default(),
#[cfg(feature = "stats")]
stats_keys: Default::default(),
}
}
pub(crate) fn disable_data_routes(&mut self) {
self.data_routes.get_mut().unwrap().clear();
}
}
pub(crate) struct HatResourceContext {
pub(crate) ctx: Box<dyn Any + Send + Sync>,
pub(crate) data_routes: RwLock<DataRoutes>,
pub(crate) query_routes: RwLock<QueryRoutes>,
}
impl HatResourceContext {
pub(crate) fn new(ctx: Box<dyn Any + Send + Sync>) -> Self {
HatResourceContext {
ctx,
data_routes: Default::default(),
query_routes: Default::default(),
}
}
pub(crate) fn disable_data_routes(&mut self) {
self.data_routes.get_mut().unwrap().clear();
}
pub(crate) fn disable_query_routes(&mut self) {
self.query_routes.get_mut().unwrap().clear();
}
}
pub struct Resource {
pub(crate) parent: Option<Arc<Resource>>,
pub(crate) expr: String,
pub(crate) suffix: usize,
pub(crate) nonwild_prefix: Option<Arc<Resource>>,
pub(crate) children: SingleOrBoxHashSet<Child>,
pub(crate) ctx: Option<Box<ResourceContext>>,
pub(crate) face_ctxs: IntHashMap<FaceId, Arc<FaceContext>>,
}
impl PartialEq for Resource {
fn eq(&self, other: &Self) -> bool {
self.expr() == other.expr()
}
}
impl Eq for Resource {}
impl Hash for Resource {
fn hash<H: Hasher>(&self, state: &mut H) {
self.expr().hash(state);
}
}
impl Debug for Resource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.expr())
}
}
#[derive(Clone)]
pub(crate) struct Child(Arc<Resource>);
impl Deref for Child {
type Target = Arc<Resource>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Child {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl PartialEq for Child {
fn eq(&self, other: &Self) -> bool {
self.0.suffix() == other.0.suffix()
}
}
impl Eq for Child {}
impl Hash for Child {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.suffix().hash(state);
}
}
impl Borrow<str> for Child {
fn borrow(&self) -> &str {
self.0.suffix()
}
}
impl Resource {
fn new(parent: &Arc<Resource>, suffix: &str, context: Option<ResourceContext>) -> Resource {
let nonwild_prefix = match &parent.nonwild_prefix {
None => {
if suffix.contains('*') {
Some(parent.clone())
} else {
None
}
}
Some(prefix) => Some(prefix.clone()),
};
Resource {
parent: Some(parent.clone()),
expr: parent.expr.clone() + suffix,
suffix: parent.expr.len(),
nonwild_prefix,
children: SingleOrBoxHashSet::new(),
ctx: context.map(Box::new),
face_ctxs: IntHashMap::new(),
}
}
pub fn expr(&self) -> &str {
&self.expr
}
pub fn keyexpr(&self) -> Option<&keyexpr> {
if self.parent.is_none() {
None
} else {
unsafe { Some(keyexpr::from_str_unchecked(&self.expr)) }
}
}
pub fn suffix(&self) -> &str {
&self.expr[self.suffix..]
}
#[inline(always)]
pub(crate) fn context(&self) -> &ResourceContext {
self.ctx.as_ref().unwrap()
}
#[inline(always)]
pub(crate) fn context_mut(&mut self) -> &mut ResourceContext {
self.ctx.as_mut().unwrap()
}
pub(crate) fn matches(&self, other: &Resource) -> bool {
debug_assert!(self.ctx.is_some());
self.ctx.as_ref().is_some_and(|ctx| {
ctx.matches
.iter()
.any(|m| m.upgrade().is_some_and(|m| &*m == other))
})
}
pub fn nonwild_prefix(res: &Arc<Resource>) -> (Option<Arc<Resource>>, String) {
match &res.nonwild_prefix {
None => (Some(res.clone()), "".to_string()),
Some(nonwild_prefix) => {
if !nonwild_prefix.expr().is_empty() {
(
Some(nonwild_prefix.clone()),
res.expr[nonwild_prefix.expr.len()..].to_string(),
)
} else {
(None, res.expr().to_string())
}
}
}
}
pub fn root() -> Arc<Resource> {
Arc::new(Resource {
parent: None,
expr: String::from(""),
suffix: 0,
nonwild_prefix: None,
children: SingleOrBoxHashSet::new(),
ctx: None,
face_ctxs: IntHashMap::new(),
})
}
#[tracing::instrument(level = "trace")]
pub fn clean(res: &mut Arc<Resource>) {
let mut resclone = res.clone();
let mutres = get_mut_unchecked(&mut resclone);
if let Some(ref mut parent) = mutres.parent {
tracing::trace!(strong_count = Arc::strong_count(res));
if Arc::strong_count(res) <= 3 && res.children.is_empty() {
tracing::debug!("Unregister resource {}", res.expr());
if let Some(context) = mutres.ctx.as_mut() {
for match_ in &mut context.matches {
let mut match_ = match_.upgrade().unwrap();
if !Arc::ptr_eq(&match_, res) {
let mutmatch = get_mut_unchecked(&mut match_);
if let Some(ctx) = mutmatch.ctx.as_mut() {
ctx.matches
.retain(|x| !Arc::ptr_eq(&x.upgrade().unwrap(), res));
}
}
}
}
mutres.nonwild_prefix.take();
{
get_mut_unchecked(parent).children.remove(res.suffix());
}
Resource::clean(parent);
}
}
}
pub fn close(self: &mut Arc<Resource>) {
let r = get_mut_unchecked(self);
for mut c in r.children.drain() {
Self::close(&mut c);
}
r.parent.take();
r.nonwild_prefix.take();
r.ctx.take();
r.face_ctxs.clear();
}
#[cfg(test)]
pub fn print_tree(from: &Arc<Resource>) -> String {
let mut result = from.expr().to_string();
result.push('\n');
for child in from.children.iter() {
result.push_str(&Resource::print_tree(child));
}
result
}
#[tracing::instrument(level = "debug", skip(tables), ret)]
pub fn make_resource(
tables: &mut Tables,
from: &mut Arc<Resource>,
mut suffix: &str,
) -> Arc<Resource> {
if !suffix.is_empty() && !suffix.starts_with('/') {
if let Some(parent) = &mut from.parent.clone() {
return Resource::make_resource(tables, parent, &[from.suffix(), suffix].concat());
}
}
let mut from = from.clone();
while let Some((chunk, rest)) = Self::split_first_chunk(suffix) {
if let Some(child) = get_mut_unchecked(&mut from).children.get(chunk) {
from = child.0.clone();
} else {
let new = Arc::new(Resource::new(&from, chunk, None));
if rest.is_empty() {
tracing::debug!("Register resource {}", new.expr());
}
get_mut_unchecked(&mut from)
.children
.insert(Child(new.clone()));
from = new;
};
suffix = rest;
}
let hat = tables
.hats
.map_ref(|d| HatResourceContext::new(d.new_resource()));
Resource::upgrade_resource(&mut from, hat);
from
}
#[inline]
pub fn get_resource_ref<'a>(
mut from: &'a Arc<Resource>,
mut suffix: &str,
) -> Option<&'a Arc<Resource>> {
if !suffix.is_empty() && !suffix.starts_with('/') {
if let Some(parent) = &from.parent {
return Resource::get_resource_ref(parent, &[from.suffix(), suffix].concat());
}
}
while let Some((chunk, rest)) = Self::split_first_chunk(suffix) {
(from, suffix) = (from.children.get(chunk)?, rest);
}
Some(from)
}
#[inline]
pub fn get_resource(from: &Arc<Resource>, suffix: &str) -> Option<Arc<Resource>> {
Self::get_resource_ref(from, suffix).cloned()
}
#[inline(always)]
fn split_first_chunk(suffix: &str) -> Option<(&str, &str)> {
if suffix.is_empty() {
return None;
}
Some(match suffix[1..].find('/') {
Some(idx) => suffix.split_at(idx + 1),
None => (suffix, ""),
})
}
#[inline]
pub fn decl_key(res: &Arc<Resource>, face: &mut Arc<FaceState>) -> WireExpr<'static> {
if face.is_local {
return res.expr().to_string().into();
}
let (nonwild_prefix, wildsuffix) = Resource::nonwild_prefix(res);
match nonwild_prefix {
Some(mut nonwild_prefix) => {
if let Some(ctx) = get_mut_unchecked(&mut nonwild_prefix)
.face_ctxs
.get(&face.id)
{
if let Some(expr_id) = ctx.remote_expr_id {
return WireExpr {
scope: expr_id,
suffix: wildsuffix.into(),
mapping: Mapping::Receiver,
};
}
if let Some(expr_id) = ctx.local_expr_id {
return WireExpr {
scope: expr_id,
suffix: wildsuffix.into(),
mapping: Mapping::Sender,
};
}
}
if face.region.bound().is_north()
|| face.remote_key_interests.values().any(|res| {
res.as_ref()
.map(|res| res.matches(&nonwild_prefix))
.unwrap_or(true)
})
{
let ctx = get_mut_unchecked(&mut nonwild_prefix)
.face_ctxs
.entry(face.id)
.or_insert_with(|| Arc::new(FaceContext::new(face.clone())));
let expr_id = face.get_next_local_id();
get_mut_unchecked(ctx).local_expr_id = Some(expr_id);
get_mut_unchecked(face)
.local_mappings
.insert(expr_id, nonwild_prefix.clone());
face.primitives.send_declare(RoutingContext::with_expr(
&mut Declare {
interest_id: None,
ext_qos: declare::ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: declare::ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareKeyExpr(DeclareKeyExpr {
id: expr_id,
wire_expr: nonwild_prefix.expr().to_string().into(),
}),
},
nonwild_prefix.expr().to_string(),
));
face.update_interceptors_caches(&mut nonwild_prefix);
WireExpr {
scope: expr_id,
suffix: wildsuffix.into(),
mapping: Mapping::Sender,
}
} else {
res.expr().to_string().into()
}
}
None => wildsuffix.into(),
}
}
pub fn get_best_key<'a>(&self, suffix: &'a str, sid: usize) -> WireExpr<'a> {
fn get_wire_expr<'a>(
prefix: &Resource,
suffix: impl FnOnce() -> Cow<'a, str>,
sid: usize,
) -> Option<WireExpr<'a>> {
let ctx = prefix.face_ctxs.get(&sid)?;
let (scope, mapping) = match (ctx.remote_expr_id, ctx.local_expr_id) {
(Some(expr_id), _) => (expr_id, Mapping::Receiver),
(_, Some(expr_id)) => (expr_id, Mapping::Sender),
_ => return None,
};
Some(WireExpr {
scope,
suffix: suffix(),
mapping,
})
}
fn get_best_child_key<'a>(
mut prefix: &Resource,
suffix: &'a str,
sid: usize,
) -> Option<WireExpr<'a>> {
let mut suffix_rest = suffix;
while let Some((chunk, rest)) = Resource::split_first_chunk(suffix_rest) {
match prefix.children.get(chunk) {
Some(child) => prefix = child,
None => break,
}
suffix_rest = rest;
}
while suffix_rest != suffix {
if let Some(wire_expr) = get_wire_expr(prefix, || suffix_rest.into(), sid) {
return Some(wire_expr);
}
suffix_rest = &suffix[suffix.len() - suffix_rest.len() - prefix.suffix().len()..];
prefix = prefix.parent.as_ref().unwrap();
}
None
}
fn get_best_parent_key<'a>(
prefix: &Resource,
suffix: &'a str,
sid: usize,
mut parent: &Resource,
) -> Option<WireExpr<'a>> {
loop {
let parent_suffix = || [&prefix.expr[parent.expr.len()..], suffix].concat().into();
if let Some(wire_expr) = get_wire_expr(parent, parent_suffix, sid) {
return Some(wire_expr);
}
match parent.parent.as_ref() {
Some(p) => parent = p,
None => return None,
}
}
}
get_best_child_key(self, suffix, sid)
.or_else(|| get_wire_expr(self, || suffix.into(), sid))
.or_else(|| get_best_parent_key(self, suffix, sid, self.parent.as_ref()?))
.unwrap_or_else(|| [&self.expr, suffix].concat().into())
}
pub fn get_matches(tables: &TablesData, key_expr: &keyexpr) -> Vec<Weak<Resource>> {
pub fn visit_nodes<T>(node: T, mut visit: impl FnMut(T, &mut VecDeque<T>)) {
let mut nodes = VecDeque::from([node]);
while let Some(node) = nodes.pop_front() {
visit(node, &mut nodes);
}
}
fn get_matches_from(
key_expr: &keyexpr,
from: &Arc<Resource>,
matches: &mut Vec<Weak<Resource>>,
) {
visit_nodes((key_expr, from), |(key_expr, from), nodes| {
if from.parent.is_none() || from.suffix() == "/" {
for child in from.children.iter() {
nodes.push_back((key_expr, child));
}
return;
}
let suffix: &keyexpr = from
.suffix()
.strip_prefix('/')
.unwrap_or(from.suffix())
.try_into()
.unwrap();
let (ke_chunk, ke_rest) = match key_expr.split_once('/') {
Some((chunk, rest)) => unsafe {
(
keyexpr::from_str_unchecked(chunk),
Some(keyexpr::from_str_unchecked(rest)),
)
},
None => (key_expr, None),
};
let ke_chunk_intersects_suffix = ke_chunk.intersects(suffix);
let ke_chunk_is_wild = ke_chunk.as_bytes() == b"**";
let suffix_is_wild = suffix.as_bytes() == b"**";
match ke_rest {
None => {
if ke_chunk_intersects_suffix {
if from.ctx.is_some() {
matches.push(Arc::downgrade(from));
}
if let Some(child) =
from.children.get("/**").or_else(|| from.children.get("**"))
{
if child.ctx.is_some() {
matches.push(Arc::downgrade(child))
}
}
}
if (ke_chunk_is_wild && ke_chunk_intersects_suffix) || suffix_is_wild {
for child in from.children.iter() {
nodes.push_back((key_expr, child));
}
}
}
Some(rest) => {
if ke_chunk_intersects_suffix
&& rest.as_bytes() == b"**"
&& from.ctx.is_some()
{
matches.push(Arc::downgrade(from));
}
for child in from.children.iter() {
if (ke_chunk_is_wild && ke_chunk_intersects_suffix) || suffix_is_wild {
nodes.push_back((key_expr, child));
} else if ke_chunk_intersects_suffix {
nodes.push_back((rest, child));
}
}
if (suffix_is_wild && ke_chunk_intersects_suffix) || ke_chunk_is_wild {
nodes.push_back((rest, from));
}
}
};
})
}
let mut matches = Vec::new();
get_matches_from(key_expr, &tables.root_res, &mut matches);
matches.sort_unstable_by_key(Weak::as_ptr);
matches.dedup_by_key(|res| Weak::as_ptr(res));
matches
}
pub fn match_resource(
_tables: &TablesData,
res: &mut Arc<Resource>,
matches: Vec<Weak<Resource>>,
) {
if res.ctx.is_some() {
for match_ in &matches {
let mut match_ = match_.upgrade().unwrap();
get_mut_unchecked(&mut match_)
.context_mut()
.matches
.push(Arc::downgrade(res));
}
get_mut_unchecked(res).context_mut().matches = matches;
} else {
tracing::error!("Call match_resource() on context less res {}", res.expr());
}
}
pub fn upgrade_resource(res: &mut Arc<Resource>, hat: RegionMap<HatResourceContext>) {
if res.ctx.is_none() {
get_mut_unchecked(res).ctx = Some(Box::new(ResourceContext::new(hat)));
}
}
pub(crate) fn get_ingress_cache(
&self,
face: &Face,
interceptor: &InterceptorsChain,
) -> Option<InterceptorCacheValueType> {
self.face_ctxs
.get(&face.state.id)
.and_then(|ctx| ctx.in_interceptor_cache.value(interceptor, self))
}
pub(crate) fn get_egress_cache(
&self,
face: &Face,
interceptor: &InterceptorsChain,
) -> Option<InterceptorCacheValueType> {
self.face_ctxs
.get(&face.state.id)
.and_then(|ctx| ctx.e_interceptor_cache.value(interceptor, self))
}
}
pub(crate) fn register_expr(
tables: &TablesLock,
face: &mut Arc<FaceState>,
expr_id: ExprId,
expr: &WireExpr,
) {
let rtables = zread!(tables.tables);
match rtables
.data
.get_mapping(face, &expr.scope, expr.mapping)
.cloned()
{
Some(mut prefix) => match face.remote_mappings.get(&expr_id) {
Some(res) => {
let mut fullexpr = prefix.expr().to_string();
fullexpr.push_str(expr.suffix.as_ref());
if res.expr() != fullexpr {
tracing::error!(
"{} Resource {} remapped. Remapping unsupported!",
face,
expr_id
);
}
}
None => {
let res = Resource::get_resource(&prefix, &expr.suffix);
let (mut res, mut wtables) = if res
.as_ref()
.map(|r| r.ctx.is_some())
.unwrap_or(false)
{
drop(rtables);
let wtables = zwrite!(tables.tables);
(res.unwrap(), wtables)
} else {
let mut fullexpr = prefix.expr().to_string();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables.data, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables.data, &mut res, matches);
(res, wtables)
};
let ctx = get_mut_unchecked(&mut res)
.face_ctxs
.entry(face.id)
.or_insert_with(|| Arc::new(FaceContext::new(face.clone())));
get_mut_unchecked(ctx).remote_expr_id = Some(expr_id);
get_mut_unchecked(face)
.remote_mappings
.insert(expr_id, res.clone());
let tables = &mut *wtables;
let hats = &mut tables.hats;
let region = face.region;
hats[region].disable_data_routes(&mut res);
hats[region].disable_query_routes(&mut res);
face.update_interceptors_caches(&mut res);
drop(wtables);
}
},
None => tracing::error!(
"{} Declare resource with unknown scope {}!",
face,
expr.scope
),
}
}
pub(crate) fn unregister_expr(tables: &TablesLock, face: &mut Arc<FaceState>, expr_id: ExprId) {
let mut wtables = zwrite!(tables.tables);
let tables = &mut *wtables;
let hats = &mut tables.hats;
let region = face.region;
match get_mut_unchecked(face).remote_mappings.remove(&expr_id) {
Some(mut res) => {
if let Some(ctx) = get_mut_unchecked(&mut res).face_ctxs.get_mut(&face.id) {
get_mut_unchecked(ctx).remote_expr_id = None;
}
hats[region].disable_data_routes(&mut res);
hats[region].disable_query_routes(&mut res);
face.update_interceptors_caches(&mut res);
Resource::clean(&mut res);
}
None => tracing::error!("{} Undeclare unknown resource!", face),
}
drop(wtables);
}
pub(crate) fn register_expr_interest(
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
expr: Option<&WireExpr>,
) {
if let Some(expr) = expr {
let rtables = zread!(tables.tables);
match rtables
.data
.get_mapping(face, &expr.scope, expr.mapping)
.cloned()
{
Some(mut prefix) => {
let res = Resource::get_resource(&prefix, &expr.suffix);
let (res, wtables) = if res.as_ref().map(|r| r.ctx.is_some()).unwrap_or(false) {
drop(rtables);
let wtables = zwrite!(tables.tables);
(res.unwrap(), wtables)
} else {
let mut fullexpr = prefix.expr().to_string();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables.data, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables.data, &mut res, matches);
(res, wtables)
};
get_mut_unchecked(face)
.remote_key_interests
.insert(id, Some(res));
drop(wtables);
}
None => tracing::error!(
"{} Declare keyexpr interest with unknown scope {}!",
face,
expr.scope,
),
}
} else {
let wtables = zwrite!(tables.tables);
get_mut_unchecked(face)
.remote_key_interests
.insert(id, None);
drop(wtables);
}
}