use crate::Commands;
use crate::message::PubMsg;
use crate::shard::Shard;
use kevy_resp::ArgvView;
impl<C: Commands> Shard<C> {
pub(crate) fn broadcast_notification(&mut self, channel: &[u8], payload: &[u8]) {
let (_count, channel_bits) = self
.pubsub
.read()
.expect("pubsub registry")
.get(channel)
.copied()
.unwrap_or((0, 0));
let (_pcount, pat_bits) = self.pattern_match_for_channel(channel);
let bits = channel_bits | pat_bits;
if bits == 0 {
return;
}
let m: PubMsg = std::sync::Arc::new((channel.to_vec(), payload.to_vec()));
for s in 0..self.nshards {
if bits & (1u64 << s) == 0 {
continue;
}
if s == self.id {
self.deliver_publish(&m.0, &m.1);
} else {
self.publish_batch[s].push(m.clone());
}
}
}
pub(crate) fn notify_keyspace_event(&mut self, event: &[u8], key: &[u8]) {
if self.notify_flags.keyspace {
let mut chan = Vec::with_capacity(b"__keyspace@0__:".len() + key.len());
chan.extend_from_slice(b"__keyspace@0__:");
chan.extend_from_slice(key);
self.broadcast_notification(&chan, event);
}
if self.notify_flags.keyevent {
let mut chan = Vec::with_capacity(b"__keyevent@0__:".len() + event.len());
chan.extend_from_slice(b"__keyevent@0__:");
chan.extend_from_slice(event);
self.broadcast_notification(&chan, key);
}
}
pub(crate) fn maybe_notify_dispatch<A: ArgvView + ?Sized>(&mut self, args: &A) {
if self.notify_flags.is_empty() {
return;
}
let Some(class) = self.commands.notify_class(args) else { return };
if !class.enabled_in(&self.notify_flags) {
return;
}
let Some(verb_raw) = args.first() else { return };
if args.len() < 2 {
return;
}
let key = args[1].to_vec();
let event = ascii_lower(verb_raw);
self.notify_keyspace_event(&event, &key);
}
pub(crate) fn maybe_notify_del(&mut self, keys: &[Vec<u8>]) {
if self.notify_flags.is_empty() || !self.notify_flags.generic {
return;
}
for k in keys {
self.notify_keyspace_event(b"del", k);
}
}
pub(crate) fn maybe_notify_mset(&mut self, pairs: &[(Vec<u8>, Vec<u8>)]) {
if self.notify_flags.is_empty() || !self.notify_flags.string {
return;
}
for (k, _) in pairs {
self.notify_keyspace_event(b"set", k);
}
}
pub(crate) fn maybe_notify_flush(&mut self) {
if self.notify_flags.is_empty() || !self.notify_flags.generic || !self.notify_flags.keyevent
{
return;
}
let mut chan = Vec::from(b"__keyevent@0__:flushdb".as_slice());
let _ = &mut chan; self.broadcast_notification(b"__keyevent@0__:flushdb", b"");
}
}
fn ascii_lower(s: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(s.len());
for &b in s {
out.push(b.to_ascii_lowercase());
}
out
}