use crate::protocol::{
ConnAck, ConnAckProperties, ConnectReturnCode, Disconnect, DisconnectReasonCode, LastWill,
LastWillProperties, Packet, PingResp, PubAck, PubAckReason, PubComp, PubCompReason, PubRec,
PubRecReason, PubRel, PubRelReason, Publish, PublishProperties, QoS, SubAck,
SubscribeReasonCode, UnsubAck, UnsubAckReason,
};
use crate::router::alertlog::alert;
use crate::router::scheduler::{PauseReason, Tracker};
use crate::router::{ConnectionEvents, Forward};
use crate::segments::Position;
use crate::*;
use flume::{bounded, Receiver, RecvError, Sender, TryRecvError};
use slab::Slab;
use std::collections::{HashMap, HashSet, VecDeque};
use std::str::Utf8Error;
use std::thread;
use std::time::SystemTime;
use thiserror::Error;
use tracing::{debug, error, info, trace, warn};
use super::alertlog::{Alert, AlertLog};
use super::graveyard::Graveyard;
use super::iobufs::{Incoming, Outgoing};
use super::logs::{AckLog, DataLog};
use super::scheduler::{ScheduleReason, Scheduler};
use super::shared_subs::SharedGroup;
use super::{
packetid, Connection, DataRequest, Event, FilterIdx, Meter, Notification, Print, RouterMeter,
ShadowRequest, MAX_CHANNEL_CAPACITY, MAX_SCHEDULE_ITERATIONS,
};
#[derive(Error, Debug)]
pub enum RouterError {
#[error("Receive error = {0}")]
Recv(#[from] RecvError),
#[error("Try Receive error = {0}")]
TryRecv(#[from] TryRecvError),
#[error("Disconnection")]
Disconnected,
#[error("Topic not utf-8")]
NonUtf8Topic(#[from] Utf8Error),
#[cfg(feature = "validate-tenant-prefix")]
#[error("Bad Tenant")]
BadTenant(String, String),
#[error("No matching filters to topic {0}")]
NoMatchingFilters(String),
#[error("Invalid filter prefix {0}")]
InvalidFilterPrefix(Filter),
#[error("Invalid client_id {0}")]
InvalidClientId(String),
#[error("Disconnection (Reason: {0:?})")]
Disconnect(DisconnectReasonCode),
}
const TOPIC_ALIAS_MAX: u16 = 4096;
pub struct Router {
id: RouterId,
config: RouterConfig,
graveyard: Graveyard,
meters: Slab<Sender<Vec<Meter>>>,
alerts: Slab<Sender<Vec<Alert>>>,
connections: Slab<Connection>,
connection_map: HashMap<String, ConnectionId>,
subscription_map: HashMap<Filter, HashSet<ConnectionId>>,
ibufs: Slab<Incoming>,
obufs: Slab<Outgoing>,
datalog: DataLog,
alertlog: AlertLog,
ackslog: Slab<AckLog>,
scheduler: Scheduler,
notifications: VecDeque<(ConnectionId, DataRequest)>,
router_rx: Receiver<(ConnectionId, Event)>,
router_tx: Sender<(ConnectionId, Event)>,
router_meters: RouterMeter,
cache: Option<VecDeque<Packet>>,
shared_subscriptions: HashMap<String, SharedGroup>,
last_wills: HashMap<String, (LastWill, Option<LastWillProperties>)>,
}
impl Router {
pub fn new(router_id: RouterId, config: RouterConfig) -> Router {
let (router_tx, router_rx) = bounded(1000);
let meters = Slab::with_capacity(10);
let alerts = Slab::with_capacity(10);
let connections = Slab::with_capacity(config.max_connections);
let ibufs = Slab::with_capacity(config.max_connections);
let obufs = Slab::with_capacity(config.max_connections);
let ackslog = Slab::with_capacity(config.max_connections);
let router_metrics = RouterMeter {
router_id,
..RouterMeter::default()
};
let max_connections = config.max_connections;
Router {
id: router_id,
config: config.clone(),
graveyard: Graveyard::new(),
meters,
alerts,
connections,
connection_map: Default::default(),
subscription_map: Default::default(),
ibufs,
obufs,
datalog: DataLog::new(config.clone()).unwrap(),
alertlog: AlertLog::new(config),
ackslog,
scheduler: Scheduler::with_capacity(max_connections),
notifications: VecDeque::with_capacity(1024),
router_rx,
router_tx,
router_meters: router_metrics,
cache: Some(VecDeque::with_capacity(MAX_CHANNEL_CAPACITY)),
shared_subscriptions: HashMap::new(),
last_wills: HashMap::new(),
}
}
fn link(&self) -> Sender<(ConnectionId, Event)> {
self.router_tx.clone()
}
#[tracing::instrument(skip_all)]
pub fn spawn(mut self) -> Sender<(ConnectionId, Event)> {
let router = thread::Builder::new().name(format!("router-{}", self.id));
let link = self.link();
router
.spawn(move || {
let e = self.run(0);
error!(reason=?e, "Router done!");
})
.unwrap();
link
}
#[tracing::instrument(skip_all)]
fn run(&mut self, count: usize) -> Result<(), RouterError> {
match count {
0 => loop {
self.run_inner()?;
},
n => {
for _ in 0..n {
self.run_inner()?;
}
}
};
Ok(())
}
fn run_inner(&mut self) -> Result<(), RouterError> {
if self.consume().is_none() {
let (id, data) = self.router_rx.recv()?;
self.events(id, data);
}
for _ in 0..500 {
match self.router_rx.try_recv() {
Ok((id, data)) => self.events(id, data),
Err(TryRecvError::Disconnected) => return Err(RouterError::Disconnected),
Err(TryRecvError::Empty) => break,
}
}
#[cfg(debug_assertions)]
if let Some(readyqueue) = self.scheduler.check_readyqueue_duplicates() {
warn!(
"Connection was scheduled multiple times in readyqueue: {:?}",
readyqueue
);
}
for _ in 0..100 {
self.consume();
}
Ok(())
}
fn events(&mut self, id: ConnectionId, data: Event) {
let span = tracing::error_span!("[>] incoming", connection_id = id);
let _guard = span.enter();
match data {
Event::Connect {
connection,
incoming,
outgoing,
} => self.handle_new_connection(connection, incoming, outgoing),
Event::NewMeter(tx) => self.handle_new_meter(tx),
Event::NewAlert(tx) => self.handle_new_alert(tx),
Event::DeviceData => self.handle_device_payload(id),
Event::Disconnect => self.handle_disconnection(id, None),
Event::Ready => self.scheduler.reschedule(id, ScheduleReason::Ready),
Event::Shadow(request) => {
retrieve_shadow(&mut self.datalog, &mut self.obufs[id], request)
}
Event::SendAlerts => {
self.send_alerts();
}
Event::SendMeters => {
self.send_meters();
}
Event::PrintStatus(metrics) => print_status(self, metrics),
Event::PublishWill((client_id, _tenant_id)) => self.handle_last_will(
client_id,
#[cfg(feature = "validate-tenant-prefix")]
_tenant_id,
),
}
}
fn handle_new_connection(
&mut self,
mut connection: Connection,
incoming: Incoming,
mut outgoing: Outgoing,
) {
let client_id = outgoing.client_id.clone();
if let Err(err) = validate_clientid(&client_id) {
error!("Invalid client_id: {}", err);
return;
};
let span = tracing::info_span!("incoming_connect", client_id);
let _guard = span.enter();
if cfg!(not(feature = "allow-duplicate-clientid")) {
let connection_id = self.connection_map.get(&client_id);
if let Some(connection_id) = connection_id {
error!(
"Duplicate client_id, dropping previous connection with connection_id: {}",
connection_id
);
self.handle_disconnection(*connection_id, None);
}
}
if self.connections.len() >= self.config.max_connections {
error!("no space for new connection");
return;
}
let saved = self.graveyard.retrieve(&client_id);
let clean_session = connection.clean;
let previous_session = saved.as_ref().is_some_and(|s| s.session_state.is_some());
let mut pending_acks = VecDeque::new();
let tracker = if !clean_session {
let saved_state = saved.and_then(|saved| {
connection.events = saved.metrics;
saved.session_state
});
saved_state.map_or_else(
|| Tracker::new(client_id.clone()),
|session_state| {
connection.subscriptions = session_state.subscriptions;
pending_acks.clone_from(&session_state.unacked_pubrels);
outgoing.unacked_pubrels = session_state.unacked_pubrels;
session_state.tracker
},
)
} else {
connection.events = saved.map_or_else(ConnectionEvents::default, |s| s.metrics);
Tracker::new(client_id.clone())
};
let ackslog = AckLog::new();
let time = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(v) => v.as_millis().to_string(),
Err(e) => format!("Time error = {e:?}"),
};
let event = "connection at ".to_owned() + &time + ", clean = " + &clean_session.to_string();
connection.events.events.push_back(event);
if connection.events.events.len() > 10 {
connection.events.events.pop_front();
}
if let Some(will) = connection.last_will.take() {
self.last_wills.insert(
client_id.clone(),
(will, connection.last_will_properties.take()),
);
}
let connection_id = self.connections.insert(connection);
assert_eq!(self.ibufs.insert(incoming), connection_id);
assert_eq!(self.obufs.insert(outgoing), connection_id);
self.connection_map.insert(client_id.clone(), connection_id);
info!(connection_id, "Client connection registered");
assert_eq!(self.ackslog.insert(ackslog), connection_id);
assert_eq!(self.scheduler.add(tracker), connection_id);
debug_assert!(self
.scheduler
.check_tracker_duplicates(connection_id)
.is_none());
let ack = ConnAck {
session_present: !clean_session && previous_session,
code: ConnectReturnCode::Success,
};
let properties = ConnAckProperties {
topic_alias_max: Some(TOPIC_ALIAS_MAX),
..Default::default()
};
let ackslog = self.ackslog.get_mut(connection_id).unwrap();
ackslog.connack(connection_id, ack, Some(properties));
pending_acks.into_iter().for_each(|pkid| {
let pubrel = PubRel {
pkid,
reason: PubRelReason::Success,
};
ackslog.pubrel(pubrel)
});
self.scheduler
.reschedule(connection_id, ScheduleReason::Init);
self.router_meters.total_connections += 1;
}
fn handle_new_meter(&mut self, tx: Sender<Vec<Meter>>) {
let _meter_id = self.meters.insert(tx);
}
fn handle_new_alert(&mut self, tx: Sender<Vec<Alert>>) {
let _alert_id = self.alerts.insert(tx);
}
fn handle_disconnection(&mut self, id: ConnectionId, reason: Option<DisconnectReasonCode>) {
let client_id = match &self.obufs.get(id) {
Some(v) => v.client_id.clone(),
None => {
error!("no-connection id {} is already gone", id);
return;
}
};
let span = tracing::info_span!("incoming_disconnect", client_id);
let _guard = span.enter();
info!("Disconnecting connection");
if let Some(reason_code) = reason {
let outgoing = match self.obufs.get_mut(id) {
Some(v) => v,
None => {
error!("no-connection id {} is already gone", id);
return;
}
};
let disconnect = Disconnect { reason_code };
let disconnect_notification = Notification::Disconnect(disconnect, None);
outgoing
.data_buffer
.lock()
.push_back(disconnect_notification);
outgoing.handle.try_send(()).ok();
}
let mut connection = self.connections.remove(id);
let _incoming = self.ibufs.remove(id);
let outgoing = self.obufs.remove(id);
let mut tracker = self.scheduler.remove(id);
self.connection_map.remove(&client_id);
self.ackslog.remove(id);
let inflight_data_requests = self.datalog.clean(id);
let retransmissions = outgoing.retransmission_map();
self.shared_subscriptions.retain(|_, group| {
group.remove_client(&client_id);
!group.is_empty()
});
for filter in connection.subscriptions.iter() {
if let Some(connections) = self.subscription_map.get_mut(filter) {
connections.remove(&id);
}
}
let time = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(v) => v.as_millis().to_string(),
Err(e) => format!("Time error = {e:?}"),
};
let event = "disconnection at ".to_owned() + &time;
connection.events.events.push_back(event);
if connection.events.events.len() > 10 {
connection.events.events.pop_front();
}
if !connection.clean {
inflight_data_requests
.into_iter()
.for_each(|r| tracker.register_data_request(r));
for request in tracker.data_requests.iter_mut() {
if let Some(cursor) = retransmissions.get(&request.filter_idx) {
request.cursor = *cursor;
if let Some(group_name) = &request.group {
self.shared_subscriptions
.get_mut(group_name)
.expect("group must exists")
.cursor = *cursor;
}
}
}
self.graveyard.save_state(
tracker,
connection.subscriptions,
connection.events,
outgoing.unacked_pubrels,
);
} else {
tracker.pause(PauseReason::Busy);
let id = tracker.id.clone();
self.graveyard.save_metrics(id, connection.events);
}
self.router_meters.total_connections -= 1;
}
fn handle_device_payload(&mut self, id: ConnectionId) {
let incoming = match self.ibufs.get_mut(id) {
Some(v) => v,
None => {
error!("no-connection id {} is already gone", id);
return;
}
};
let client_id = incoming.client_id.clone();
let span = tracing::error_span!("incoming_payload", client_id);
let _guard = span.enter();
let mut packets = incoming.exchange(self.cache.take().unwrap());
let mut force_ack = false;
let mut new_data = false;
let mut disconnect = false;
let mut disconnect_reason: Option<DisconnectReasonCode> = None;
for packet in packets.drain(0..) {
match packet {
Packet::Publish(publish, properties) => {
let span = tracing::error_span!("publish", topic = ?publish.topic, pkid = publish.pkid);
let _guard = span.enter();
let qos = publish.qos;
let pkid = publish.pkid;
match qos {
QoS::AtLeastOnce => {
let puback = PubAck {
pkid,
reason: PubAckReason::Success,
};
let ackslog = self.ackslog.get_mut(id).unwrap();
ackslog.puback(puback);
force_ack = true;
}
QoS::ExactlyOnce => {
let pubrec = PubRec {
pkid,
reason: PubRecReason::Success,
};
let ackslog = self.ackslog.get_mut(id).unwrap();
ackslog.pubrec(publish, properties, pubrec);
force_ack = true;
continue;
}
QoS::AtMostOnce => {
}
};
self.router_meters.total_publishes += 1;
match append_to_commitlog(
id,
publish.clone(),
properties,
&mut self.datalog,
&mut self.notifications,
&mut self.connections,
) {
Ok(_offset) => {
new_data = true;
}
Err(e) => {
error!(
reason = ?e, "Failed to append to commitlog"
);
self.router_meters.failed_publishes += 1;
disconnect = true;
if let RouterError::Disconnect(code) = e {
disconnect_reason = Some(code)
}
break;
}
};
let meter = &mut self.ibufs.get_mut(id).unwrap().meter;
if let Err(e) = meter.register_publish(&publish) {
error!(
reason = ?e, "Failed to write to incoming meter"
);
};
}
Packet::Subscribe(mut subscribe, props) => {
let mut return_codes = Vec::new();
let pkid = subscribe.pkid;
for f in &mut subscribe.filters {
let span =
tracing::info_span!("subscribe", topic = f.path, pkid = subscribe.pkid);
let _guard = span.enter();
info!("Adding subscription on topic {}", f.path);
let connection = self.connections.get_mut(id).unwrap();
if let Err(e) = validate_subscription(connection, f) {
warn!(reason = ?e,"Subscription cannot be validated: {}", e);
disconnect = true;
break;
}
let mut filter = f.path.clone();
let mut group = None;
if let Some((grp, filter_path)) = extract_group(&f.path) {
group = Some(grp);
filter = filter_path;
};
let subscription_id = props.as_ref().and_then(|p| p.id);
if subscription_id == Some(0) {
error!("Subscription identifier can't be 0");
disconnect = true;
disconnect_reason = Some(DisconnectReasonCode::ProtocolError);
break;
}
let (idx, cursor) = self.datalog.next_native_offset(&filter);
self.prepare_filter(id, cursor, idx, f, group, subscription_id);
let code = match f.qos {
QoS::AtMostOnce => SubscribeReasonCode::QoS0,
QoS::AtLeastOnce => SubscribeReasonCode::QoS1,
QoS::ExactlyOnce => SubscribeReasonCode::QoS2,
};
return_codes.push(code);
}
let suback = SubAck { pkid, return_codes };
let ackslog = self.ackslog.get_mut(id).unwrap();
ackslog.suback(suback);
force_ack = true;
}
Packet::Unsubscribe(unsubscribe, _) => {
let connection = self.connections.get_mut(id).unwrap();
let pkid = unsubscribe.pkid;
for filter in &unsubscribe.filters {
let span = tracing::info_span!("unsubscribe", topic = filter, pkid);
let _guard = span.enter();
debug!("Removing subscription on filter {}", filter);
if let Some(connection_ids) = self.subscription_map.get_mut(filter) {
let removed = connection_ids.remove(&id);
if !removed {
continue;
}
let meter = &mut self.ibufs.get_mut(id).unwrap().meter;
meter.unregister_subscription(filter);
if !connection.subscriptions.remove(filter) {
warn!(
pkid = unsubscribe.pkid,
"Unsubscribe failed as filter was not subscribed previously"
);
continue;
}
self.shared_subscriptions.retain(|_, group| {
group.remove_client(&client_id);
!group.is_empty()
});
if let Some(broker_aliases) = connection.broker_topic_aliases.as_mut() {
broker_aliases.remove_alias(filter);
}
connection.subscription_ids.remove(filter);
let unsuback = UnsubAck {
pkid,
reasons: vec![UnsubAckReason::Success],
};
let ackslog = self.ackslog.get_mut(id).unwrap();
ackslog.unsuback(unsuback);
self.scheduler.untrack(id, filter);
self.datalog.remove_waiters_for_id(id, filter);
force_ack = true;
}
}
}
Packet::PubAck(puback, _) => {
let span = tracing::info_span!("puback", pkid = puback.pkid);
let _guard = span.enter();
let outgoing = self.obufs.get_mut(id).unwrap();
let pkid = puback.pkid;
if outgoing.register_ack(pkid).is_none() {
error!(pkid, "Unsolicited/ooo ack received for pkid {}", pkid);
disconnect = true;
break;
}
self.scheduler.reschedule(id, ScheduleReason::IncomingAck);
}
Packet::PubRec(pubrec, _) => {
let span = tracing::info_span!("pubrec", pkid = pubrec.pkid);
let _guard = span.enter();
let outgoing = self.obufs.get_mut(id).unwrap();
let pkid = pubrec.pkid;
if outgoing.register_ack(pkid).is_none() {
error!(pkid, "Unsolicited/ooo ack received for pkid {}", pkid);
disconnect = true;
break;
}
let ackslog = self.ackslog.get_mut(id).unwrap();
let pubrel = PubRel {
pkid: pubrec.pkid,
reason: PubRelReason::Success,
};
outgoing.register_pubrec(pubrel.pkid);
ackslog.pubrel(pubrel);
self.scheduler.reschedule(id, ScheduleReason::IncomingAck);
}
Packet::PubRel(pubrel, None) => {
let span = tracing::info_span!("pubrel", pkid = pubrel.pkid);
let _guard = span.enter();
let ackslog = self.ackslog.get_mut(id).unwrap();
let pubcomp = PubComp {
pkid: pubrel.pkid,
reason: PubCompReason::Success,
};
let (publish, props) = match ackslog.pubcomp(pubcomp) {
Some(v) => v,
None => {
disconnect = true;
break;
}
};
match append_to_commitlog(
id,
publish,
props,
&mut self.datalog,
&mut self.notifications,
&mut self.connections,
) {
Ok(_offset) => {
new_data = true;
}
Err(e) => {
error!(
reason = ?e, "Failed to append to commitlog"
);
self.router_meters.failed_publishes += 1;
disconnect = true;
break;
}
};
self.scheduler.reschedule(id, ScheduleReason::IncomingAck);
}
Packet::PubComp(pubcomp, _) => {
let span = tracing::info_span!("pubcomp", pkid = pubcomp.pkid);
let _guard = span.enter();
let outgoing = self.obufs.get_mut(id).unwrap();
let pkid = pubcomp.pkid;
if outgoing.register_pubcomp(pkid).is_none() {
error!(
pkid,
"ack received for pkid {}, but the pkid didn't exists!", pkid
);
disconnect = true;
break;
}
}
Packet::PingReq(_) => {
let ackslog = self.ackslog.get_mut(id).unwrap();
ackslog.pingresp(PingResp);
force_ack = true;
}
Packet::Disconnect(_, _) => {
let span = tracing::info_span!("disconnect");
let _guard = span.enter();
disconnect = true;
self.last_wills.remove(&client_id);
break;
}
incoming => {
warn!(packet=?incoming, "Unexpected packet received, ignoring the packet." );
}
}
}
self.cache = Some(packets);
if force_ack {
self.scheduler.reschedule(id, ScheduleReason::FreshData);
}
if new_data {
while let Some((id, request)) = self.notifications.pop_front() {
self.scheduler.track(id, request);
self.scheduler.reschedule(id, ScheduleReason::FreshData);
}
}
if disconnect {
self.handle_disconnection(id, disconnect_reason);
}
}
fn prepare_filter(
&mut self,
id: ConnectionId,
cursor: Offset,
filter_idx: FilterIdx,
filter: &protocol::Filter,
group: Option<String>,
subscription_id: Option<usize>,
) {
let filter_path = &filter.path;
match self.subscription_map.get_mut(filter_path) {
Some(connections) => {
connections.insert(id);
}
None => {
let mut connections = HashSet::new();
connections.insert(id);
self.subscription_map
.insert(filter_path.clone(), connections);
}
}
let connection = self.connections.get_mut(id).unwrap();
if let Some(group_name) = &group {
let client_id = connection.client_id.clone();
let shared_group = self
.shared_subscriptions
.entry(group_name.to_string())
.or_insert(SharedGroup::new(
cursor,
self.config.shared_subscriptions_strategy.clone(),
));
shared_group.add_client(client_id);
};
if let Some(subscription_id) = subscription_id {
connection
.subscription_ids
.insert(filter_path.clone(), subscription_id);
}
let forward_retained = group.is_none();
if connection.subscriptions.insert(filter_path.clone()) {
let request = DataRequest {
filter: filter_path.clone(),
filter_idx,
qos: filter.qos as u8,
cursor,
read_count: 0,
max_count: 100,
forward_retained,
group,
};
self.scheduler.track(id, request);
self.scheduler.reschedule(id, ScheduleReason::NewFilter);
debug_assert!(self.scheduler.check_tracker_duplicates(id).is_none())
}
let meter = &mut self.ibufs.get_mut(id).unwrap().meter;
meter.register_subscription(filter_path.clone());
}
fn consume(&mut self) -> Option<()> {
let (id, mut requests) = self.scheduler.poll()?;
let span = tracing::info_span!("[<] outgoing", connection_id = id);
let _guard = span.enter();
let outgoing = match self.obufs.get_mut(id) {
Some(v) => v,
None => {
error!("Connection is already disconnected");
return Some(());
}
};
let ackslog = self.ackslog.get_mut(id).unwrap();
let datalog = &mut self.datalog;
let alertlog = &mut self.alertlog;
trace!("Consuming requests");
ack_device_data(ackslog, outgoing);
let connection = &mut self.connections[id];
let mut skipped_requests: VecDeque<DataRequest> = VecDeque::new();
for _ in 0..MAX_SCHEDULE_ITERATIONS {
let mut request = match requests.pop_front() {
Some(request) => request,
None => {
if skipped_requests.is_empty() {
self.scheduler.pause(id, PauseReason::Caughtup);
}
self.scheduler.trackv(id, skipped_requests);
return Some(());
}
};
let shared_group = request
.group
.as_ref()
.and_then(|name| self.shared_subscriptions.get_mut(name));
match forward_device_data(
&mut request,
datalog,
outgoing,
alertlog,
connection,
shared_group,
) {
ConsumeStatus::BufferFull => {
requests.push_back(request);
self.scheduler.pause(id, PauseReason::Busy);
break;
}
ConsumeStatus::InflightFull => {
requests.push_back(request);
self.scheduler.pause(id, PauseReason::InflightFull);
break;
}
ConsumeStatus::FilterCaughtup => {
let filter = &request.filter;
trace!(filter, "Filter caughtup {filter}, parking connection");
datalog.park(id, request);
}
ConsumeStatus::PartialRead => {
requests.push_back(request);
}
ConsumeStatus::SkipRequest => {
skipped_requests.push_back(request);
}
}
}
requests.extend(skipped_requests);
self.scheduler.trackv(id, requests);
Some(())
}
pub fn handle_last_will(
&mut self,
client_id: String,
#[cfg(feature = "validate-tenant-prefix")] tenant_id: Option<String>,
) {
#[cfg(feature = "validate-tenant-prefix")]
let tenant_prefix = tenant_id.map(|id| format!("/tenants/{id}/"));
let Some((will, will_props)) = self.last_wills.remove(&client_id) else {
return;
};
let publish = Publish {
dup: false,
qos: will.qos,
retain: will.retain,
topic: will.topic,
pkid: 0,
payload: will.message,
};
let properties = will_props.map(|props| PublishProperties {
payload_format_indicator: props.payload_format_indicator,
message_expiry_interval: props.message_expiry_interval,
response_topic: props.response_topic,
correlation_data: props.correlation_data,
user_properties: props.user_properties,
content_type: props.content_type,
..Default::default()
});
match append_will_message(
publish,
properties,
&mut self.datalog,
&mut self.notifications,
#[cfg(feature = "validate-tenant-prefix")]
tenant_prefix,
) {
Ok(_offset) => {
while let Some((id, request)) = self.notifications.pop_front() {
self.scheduler.track(id, request);
self.scheduler.reschedule(id, ScheduleReason::FreshData);
}
}
Err(e) => {
error!(
reason = ?e, "Failed to append to commitlog"
);
self.router_meters.failed_publishes += 1;
}
};
}
fn send_meters(&mut self) {
let mut meters = Vec::with_capacity(10);
if let Some(router_meter) = self.router_meters.get() {
meters.push(Meter::Router(self.id, router_meter));
}
for f in self.subscription_map.keys() {
let filter = f.to_owned();
if let Some(subscription_meter) = self.datalog.meter(f).and_then(|meter| meter.get()) {
meters.push(Meter::Subscription(filter, subscription_meter));
}
}
if !meters.is_empty() {
for (meter_id, link) in self.meters.iter() {
if let Err(e) = link.try_send(meters.clone()) {
error!(meter_id, "Failed to send meter. Error = {:?}", e);
}
}
}
}
fn send_alerts(&mut self) {
let alerts = self.alertlog.take();
if !alerts.is_empty() {
let alerts: Vec<Alert> = alerts.into();
for (meter_id, link) in self.alerts.iter() {
if let Err(e) = link.try_send(alerts.clone()) {
error!(meter_id, "Failed to send alert. Error = {:?}", e);
}
}
}
}
}
fn append_to_commitlog(
id: ConnectionId,
mut publish: Publish,
mut properties: Option<PublishProperties>,
datalog: &mut DataLog,
notifications: &mut VecDeque<(ConnectionId, DataRequest)>,
connections: &mut Slab<Connection>,
) -> Result<Offset, RouterError> {
let connection = connections.get_mut(id).unwrap();
let topic_alias = properties.as_mut().and_then(|p| {
p.topic_alias.take()
});
if properties
.as_ref()
.is_some_and(|p| !p.subscription_identifiers.is_empty())
{
error!("A PUBLISH packet sent from a Client to a Server MUST NOT contain a Subscription Identifier");
return Err(RouterError::Disconnect(
DisconnectReasonCode::MalformedPacket,
));
}
if let Some(alias) = topic_alias {
validate_and_set_topic_alias(&mut publish, connection, alias)?;
};
let topic = std::str::from_utf8(&publish.topic)?;
#[cfg(feature = "validate-tenant-prefix")]
if let Some(tenant_prefix) = &connection.tenant_prefix {
if !topic.starts_with(tenant_prefix) {
return Err(RouterError::BadTenant(
tenant_prefix.to_owned(),
topic.to_owned(),
));
}
}
if publish.payload.is_empty() {
datalog.remove_from_retained_publishes(topic.to_owned());
} else if publish.retain {
datalog.insert_to_retained_publishes(publish.clone(), properties.clone(), topic.to_owned());
}
publish.retain = false;
let pkid = publish.pkid;
let filter_idxs = datalog.matches(topic);
let filter_idxs = match filter_idxs {
Some(v) => v,
None if connection.dynamic_filters => {
let (idx, _cursor) = datalog.next_native_offset(topic);
vec![idx]
}
None => return Err(RouterError::NoMatchingFilters(topic.to_owned())),
};
let mut o = (0, 0);
for filter_idx in filter_idxs {
let datalog = datalog.native.get_mut(filter_idx).unwrap();
let publish_data = (publish.clone(), properties.clone());
let (offset, filter) = datalog.append(publish_data.into(), notifications);
debug!(
pkid,
"Appended to commitlog: {}[{}, {})", filter, offset.0, offset.1,
);
o = offset;
}
Ok(o)
}
fn append_will_message(
mut publish: Publish,
properties: Option<PublishProperties>,
datalog: &mut DataLog,
notifications: &mut VecDeque<(ConnectionId, DataRequest)>,
#[cfg(feature = "validate-tenant-prefix")] tenant_prefix: Option<String>,
) -> Result<Offset, RouterError> {
if properties
.as_ref()
.is_some_and(|p| !p.subscription_identifiers.is_empty())
{
error!("A PUBLISH packet sent from a Client to a Server MUST NOT contain a Subscription Identifier");
return Err(RouterError::Disconnect(
DisconnectReasonCode::MalformedPacket,
));
}
let topic = std::str::from_utf8(&publish.topic)?;
#[cfg(feature = "validate-tenant-prefix")]
if let Some(tenant_prefix) = tenant_prefix {
if !topic.starts_with(&tenant_prefix) {
return Err(RouterError::BadTenant(
tenant_prefix.to_owned(),
topic.to_owned(),
));
}
}
if publish.payload.is_empty() {
datalog.remove_from_retained_publishes(topic.to_owned());
} else if publish.retain {
datalog.insert_to_retained_publishes(publish.clone(), properties.clone(), topic.to_owned());
}
publish.retain = false;
let pkid = publish.pkid;
let filter_idxs = datalog.matches(topic);
let filter_idxs = match filter_idxs {
Some(v) => v,
None => return Err(RouterError::NoMatchingFilters(topic.to_owned())),
};
let mut o = (0, 0);
for filter_idx in filter_idxs {
let datalog = datalog.native.get_mut(filter_idx).unwrap();
let publish_data = (publish.clone(), properties.clone());
let (offset, filter) = datalog.append(publish_data.into(), notifications);
debug!(
pkid,
"Appended to commitlog: {}[{}, {})", filter, offset.0, offset.1,
);
o = offset;
}
Ok(o)
}
fn validate_and_set_topic_alias(
publish: &mut Publish,
connection: &mut Connection,
alias: u16,
) -> Result<(), RouterError> {
if alias == 0 || alias > TOPIC_ALIAS_MAX {
error!("Alias must be greater than 0 and <={TOPIC_ALIAS_MAX}");
return Err(RouterError::Disconnect(
DisconnectReasonCode::TopicAliasInvalid,
));
}
if publish.topic.is_empty() {
let Some(alias_topic) = connection.topic_aliases.get(&alias) else {
error!("Empty topic name with invalid alias");
return Err(RouterError::Disconnect(DisconnectReasonCode::ProtocolError));
};
publish.topic = alias_topic.to_owned().into();
} else {
let topic = std::str::from_utf8(&publish.topic)?;
connection.topic_aliases.insert(alias, topic.to_owned());
trace!("set alias {alias} for topic {topic}");
};
Ok(())
}
fn ack_device_data(ackslog: &mut AckLog, outgoing: &mut Outgoing) -> bool {
let span = tracing::info_span!("outgoing_ack", client_id = outgoing.client_id);
let _guard = span.enter();
let acks = ackslog.readv();
if acks.is_empty() {
debug!("No acks pending");
return false;
}
let mut count = 0;
let mut buffer = outgoing.data_buffer.lock();
for ack in acks.drain(..) {
let pkid = packetid(&ack);
trace!(pkid, "Ack added for pkid {}", pkid);
let message = Notification::DeviceAck(ack);
buffer.push_back(message);
count += 1;
}
debug!(acks_count = count, "Acks sent to device");
outgoing.handle.try_send(()).ok();
true
}
enum ConsumeStatus {
BufferFull,
InflightFull,
FilterCaughtup,
PartialRead,
SkipRequest,
}
fn forward_device_data(
request: &mut DataRequest,
datalog: &mut DataLog,
outgoing: &mut Outgoing,
alertlog: &mut AlertLog,
connection: &mut Connection,
shared_group: Option<&mut SharedGroup>,
) -> ConsumeStatus {
let span = tracing::info_span!("outgoing_publish", client_id = outgoing.client_id);
let _guard = span.enter();
if let Some(ref shared_group) = shared_group {
request.cursor = shared_group.cursor;
}
trace!(
"Reading from datalog: {}[{}, {}]",
request.filter,
request.cursor.0,
request.cursor.1
);
let mut inflight_slots = if request.qos != 0 {
let len = outgoing.free_slots();
if len == 0 {
trace!("Aborting read from datalog: inflight capacity reached");
return ConsumeStatus::InflightFull;
}
len as u64
} else {
datalog.config.max_outgoing_packet_count
};
if shared_group
.as_ref()
.is_some_and(|g| g.strategy == Strategy::RoundRobin)
{
inflight_slots = 1;
}
let mut publishes = Vec::new();
if request.forward_retained {
let mut retained_publishes = datalog.read_retained_messages(&request.filter);
retained_publishes.truncate(inflight_slots as usize);
publishes.extend(retained_publishes.into_iter().map(|p| (p, None)));
inflight_slots -= publishes.len() as u64;
request.forward_retained = false;
}
let (next, publishes_from_datalog) =
match datalog.native_readv(request.filter_idx, request.cursor, inflight_slots) {
Ok(v) => v,
Err(e) => {
error!(error = ?e, "Failed to read from commitlog {}", e);
return ConsumeStatus::FilterCaughtup;
}
};
publishes.extend(
publishes_from_datalog
.into_iter()
.map(|(p, offset)| (p, Some(offset))),
);
let (start, next, caughtup) = match next {
Position::Next { start, end } => (start, end, false),
Position::Done { start, end } => (start, end, true),
};
if let Some(ref shared_group) = shared_group {
let skip_current_client = Some(&outgoing.client_id) != shared_group.current_client();
if skip_current_client {
return if caughtup {
ConsumeStatus::FilterCaughtup
} else {
ConsumeStatus::SkipRequest
};
}
}
if start != request.cursor {
let error = format!(
"Read cursor start jumped from {:?} to {:?} on {}",
request.cursor, start, request.filter
);
warn!(
request_cursor = ?request.cursor,
start_cursor = ?start,
error
);
let alert = alert::cursorjump(&outgoing.client_id, &request.filter, 0);
alertlog.log(alert);
}
trace!(
"Read from commitlog, cursor = {}[{}, {}), read count = {}",
request.filter,
next.0,
next.1,
publishes.len()
);
let qos = request.qos;
let filter_idx = request.filter_idx;
request.read_count += publishes.len();
request.cursor = next;
if publishes.is_empty() {
return ConsumeStatus::FilterCaughtup;
}
let broker_topic_aliases = &mut connection.broker_topic_aliases;
let mut topic_alias = broker_topic_aliases
.as_ref()
.and_then(|aliases| aliases.get_alias(&request.filter));
let topic_alias_already_exists = topic_alias.is_some();
if !topic_alias_already_exists {
topic_alias = broker_topic_aliases
.as_mut()
.and_then(|broker_aliases| broker_aliases.set_new_alias(&request.filter))
}
let subscription_id = connection.subscription_ids.get(&request.filter);
let forwards = publishes
.into_iter()
.map(|((mut publish, mut properties), offset)| {
publish.qos = protocol::qos(qos).unwrap();
if topic_alias.is_some() {
let mut props = properties.unwrap_or_default();
props.topic_alias = topic_alias;
properties = Some(props);
}
if topic_alias_already_exists {
publish.topic.clear()
}
if let Some(&subscription_id) = subscription_id {
let mut props = properties.unwrap_or_default();
props.subscription_identifiers.push(subscription_id);
properties = Some(props);
}
Forward {
cursor: offset,
size: 0,
publish,
properties,
}
});
let (len, inflight) = outgoing.push_forwards(forwards, qos, filter_idx);
debug!(
inflight_count = inflight,
forward_count = len,
"Forwarding publishes, cursor = {}[{}, {}) forward count = {}",
request.filter,
request.cursor.0,
request.cursor.1,
len
);
if len >= MAX_CHANNEL_CAPACITY - 1 {
debug!("Outgoing channel reached its capacity");
outgoing.push_notification(Notification::Unschedule);
outgoing.handle.try_send(()).ok();
return ConsumeStatus::BufferFull;
}
outgoing.handle.try_send(()).ok();
if let Some(share) = shared_group {
share.update_next_client();
share.cursor = request.cursor;
}
if caughtup {
ConsumeStatus::FilterCaughtup
} else {
ConsumeStatus::PartialRead
}
}
fn retrieve_shadow(datalog: &mut DataLog, outgoing: &mut Outgoing, shadow: ShadowRequest) {
if let Some(reply) = datalog.shadow(&shadow.filter) {
let publish = reply.0;
let shadow_reply = router::ShadowReply {
topic: publish.topic,
payload: publish.payload,
};
let message = Notification::Shadow(shadow_reply);
let len = outgoing.push_notification(message);
if len >= MAX_CHANNEL_CAPACITY - 1 {
outgoing.push_notification(Notification::Unschedule);
}
outgoing.handle.try_send(()).ok();
}
}
fn print_status(router: &mut Router, metrics: Print) {
match metrics {
Print::Config => {
let config = router.config.clone();
println!("{config:#?}");
}
Print::Router => {
let metrics = router.router_meters.clone();
println!("{metrics:#?}");
}
Print::Connection(id) => {
let metrics = router.connection_map.get(&id).map(|v| {
let c = router
.connections
.get(*v)
.map(|v| v.events.clone())
.unwrap();
let t = router.scheduler.trackers.get(*v).cloned().unwrap();
(c, t)
});
let metrics = match metrics {
Some(v) => Some(v),
None => router.graveyard.retrieve(&id).map(|v| {
(
v.metrics,
v.session_state
.map(|s| s.tracker)
.unwrap_or(Tracker::new(id)),
)
}),
};
println!("{metrics:#?}");
}
Print::Subscriptions => {
let metrics: HashMap<Filter, Vec<String>> = router
.subscription_map
.iter()
.map(|(filter, connections)| {
let connections = connections
.iter()
.map(|id| router.obufs[*id].client_id.clone())
.collect();
(filter.to_owned(), connections)
})
.collect();
println!("{metrics:#?}");
}
Print::Subscription(filter) => {
let metrics = router.datalog.meter(&filter);
println!("{metrics:#?}");
}
Print::Waiters(filter) => {
if let Some(waiters) = router.datalog.waiters(&filter) {
let v: Vec<(String, DataRequest)> = waiters
.waiters()
.iter()
.map(|(id, request)| (router.obufs[*id].client_id.clone(), request.clone()))
.collect();
println!("{v:#?}");
}
}
Print::ReadyQueue => {
let metrics = router.scheduler.readyqueue.clone();
println!("{metrics:#?}");
}
};
}
fn validate_subscription(
connection: &mut Connection,
filter: &protocol::Filter,
) -> Result<(), RouterError> {
trace!(
"validate subscription = {}, tenant = {:?}",
filter.path,
connection.tenant_prefix
);
#[cfg(feature = "validate-tenant-prefix")]
if let Some(tenant_prefix) = &connection.tenant_prefix {
if !filter.path.starts_with(tenant_prefix) {
return Err(RouterError::InvalidFilterPrefix(filter.path.to_owned()));
}
}
if filter.path.starts_with('$') && !filter.path.starts_with("$share") {
return Err(RouterError::InvalidFilterPrefix(filter.path.to_owned()));
}
Ok(())
}
fn validate_clientid(client_id: &str) -> Result<(), RouterError> {
trace!("Validating Client ID = {}", client_id,);
if "+$#/".chars().any(|c| client_id.contains(c)) {
return Err(RouterError::InvalidClientId(client_id.to_string()));
}
Ok(())
}
fn extract_group(filter: &str) -> Option<(String, String)> {
filter.strip_prefix("$share/").and_then(|s| {
s.split_once('/')
.map(|(group, path)| (group.to_string(), path.to_string()))
})
}