use anyhow::Result;
use chrono::{DateTime, Utc};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::message::{
BatchAckResult, BatchNackResult, Message, Priority, PriorityOrdering, QueueConfig, QueueStats,
};
use crate::storage::RenameQueueError;
fn hash_payload(payload: &str) -> [u8; 16] {
xxhash_rust::xxh3::xxh3_128(payload.as_bytes()).to_ne_bytes()
}
pub struct MemoryStorage {
data: Arc<RwLock<BTreeMap<String, Vec<u8>>>>,
locked_index: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
queue_configs: Arc<RwLock<HashMap<String, QueueConfig>>>,
payload_sets: Arc<RwLock<HashMap<String, HashSet<[u8; 16]>>>>,
}
impl Default for MemoryStorage {
fn default() -> Self {
Self::new()
}
}
fn encode_text_priority(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for b in s.bytes() {
match b {
b'%' => out.push_str("%25"),
b'/' => out.push_str("%2F"),
0 => out.push_str("%00"),
_ => out.push(b as char),
}
}
out
}
impl MemoryStorage {
pub fn new() -> Self {
tracing::warn!("Running with in-memory storage. All data will be lost on restart.");
Self {
data: Arc::new(RwLock::new(BTreeMap::new())),
locked_index: Arc::new(RwLock::new(HashMap::new())),
queue_configs: Arc::new(RwLock::new(HashMap::new())),
payload_sets: Arc::new(RwLock::new(HashMap::new())),
}
}
fn generate_message_key(&self, msg: &Message, config: &QueueConfig) -> String {
let ts = msg.created_at.timestamp_nanos_opt().unwrap_or(0);
match &msg.priority {
Priority::Numeric(n) => {
let priority_key = match config.ordering {
PriorityOrdering::MaxFirst => u64::MAX - n,
PriorityOrdering::MinFirst => *n,
PriorityOrdering::Fifo => 0,
};
format!("{}/{:020}/{:016}/{}", msg.queue, priority_key, ts, msg.id)
}
Priority::Text(s) => {
let key_segment = match config.ordering {
PriorityOrdering::MaxFirst => {
let bytes = s.as_bytes();
let pad_len = 512usize.max(bytes.len());
let mut buf = vec![0u8; pad_len];
buf[..bytes.len()].copy_from_slice(bytes);
format!(
"s:{}",
buf.iter()
.map(|b| format!("{:02x}", b ^ 0xFF))
.collect::<String>()
)
}
PriorityOrdering::MinFirst => format!("s:{}", encode_text_priority(s)),
PriorityOrdering::Fifo => "s:".to_string(),
};
format!("{}/{}/{:016}/{}", msg.queue, key_segment, ts, msg.id)
}
}
}
async fn get_queue_config(&self, queue_name: &str) -> QueueConfig {
self.queue_configs
.read()
.await
.get(queue_name)
.cloned()
.unwrap_or_default()
}
async fn queue_exists_inner(&self, queue_name: &str) -> Result<bool> {
if self.queue_configs.read().await.contains_key(queue_name) {
return Ok(true);
}
let data = self.data.read().await;
let prefix = format!("{}/", queue_name);
Ok(data
.range(prefix.clone()..)
.next()
.is_some_and(|(k, _)| k.starts_with(&prefix)))
}
async fn configure_queue_inner(&self, queue_name: &str, config: QueueConfig) {
let old_config = self.get_queue_config(queue_name).await;
let existed = self.queue_exists_inner(queue_name).await.unwrap_or(false);
let mut effective_config = config;
if existed {
effective_config.ordering = old_config.ordering;
}
self.queue_configs
.write()
.await
.insert(queue_name.to_string(), effective_config.clone());
let config_key = format!("_queue_config/{}", queue_name);
if let Ok(config_json) = serde_json::to_vec(&effective_config) {
self.data.write().await.insert(config_key, config_json);
}
if old_config.allow_duplicates && !effective_config.allow_duplicates {
let removed = self.dedupe_unlocked_messages(queue_name).await;
if removed > 0 {
tracing::info!(
"De-duplicated {} unlocked message(s) in queue '{}' after disabling duplicates",
removed,
queue_name
);
}
let data = self.data.read().await;
let prefix = format!("{}/", queue_name);
let mut set = HashSet::new();
for (k, v) in data.range(prefix.clone()..) {
if !k.starts_with(&prefix) {
break;
}
if let Ok(msg) = serde_json::from_slice::<Message>(v) {
set.insert(hash_payload(&msg.payload));
}
}
self.payload_sets
.write()
.await
.insert(queue_name.to_string(), set);
}
if !old_config.allow_duplicates && effective_config.allow_duplicates {
self.payload_sets.write().await.remove(queue_name);
}
}
async fn dedupe_unlocked_messages(&self, queue_name: &str) -> usize {
let mut data = self.data.write().await;
let prefix = format!("{}/", queue_name);
let now = Utc::now();
let mut seen: HashSet<String> = HashSet::new();
let mut keys_to_remove: Vec<String> = Vec::new();
for (k, v) in data.range(prefix.clone()..) {
if !k.starts_with(&prefix) {
break;
}
if let Ok(msg) = serde_json::from_slice::<Message>(v) {
let is_locked = msg.locked_until.is_some_and(|lu| lu > now);
if is_locked {
seen.insert(msg.payload);
continue;
}
if !seen.insert(msg.payload.clone()) {
keys_to_remove.push(k.clone());
}
}
}
let count = keys_to_remove.len();
for k in keys_to_remove {
data.remove(&k);
}
count
}
async fn unlock_message_by_key(&self, key: &str) -> Result<bool> {
let mut data = self.data.write().await;
let entry = data.get(key);
match entry {
Some(value) => {
let mut msg: Message = serde_json::from_slice(value)?;
if msg.locked_until.is_some() {
msg.locked_until = None;
msg.locked_by = None;
let new_value = serde_json::to_vec(&msg)?;
data.insert(key.to_string(), new_value);
Ok(true)
} else {
Ok(false)
}
}
None => Ok(false),
}
}
}
#[async_trait::async_trait]
impl crate::api::StorageApi for MemoryStorage {
async fn queue_exists(&self, queue_name: &str) -> Result<bool> {
self.queue_exists_inner(queue_name).await
}
async fn create_queue(&self, queue_name: &str, config: QueueConfig) {
self.configure_queue_inner(queue_name, config).await;
}
async fn rename_queue(
&self,
from: &str,
to: &str,
) -> std::result::Result<(), RenameQueueError> {
if from.trim().is_empty() || to.trim().is_empty() {
return Err(RenameQueueError::Storage(anyhow::anyhow!(
"queue names cannot be empty"
)));
}
if from == to {
return Ok(());
}
if !self
.queue_exists_inner(from)
.await
.map_err(RenameQueueError::Storage)?
{
return Err(RenameQueueError::NotFound);
}
if self
.queue_exists_inner(to)
.await
.map_err(RenameQueueError::Storage)?
{
return Err(RenameQueueError::AlreadyExists);
}
let existing_config = self.get_queue_config(from).await;
let now = Utc::now();
let mut data = self.data.write().await;
let mut moved_locked: Vec<(String, DateTime<Utc>)> = Vec::new();
let prefix_from = format!("{}/", from);
let entries: Vec<(String, Vec<u8>)> = data
.range(prefix_from.clone()..)
.take_while(|(k, _)| k.starts_with(&prefix_from))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (key, value) in entries {
let mut msg: Message = serde_json::from_slice(&value).map_err(|e| {
RenameQueueError::Storage(anyhow::anyhow!("deserialize message failed: {e}"))
})?;
msg.queue = to.to_string();
let new_key = self.generate_message_key(&msg, &existing_config);
let new_value = serde_json::to_vec(&msg).map_err(|e| {
RenameQueueError::Storage(anyhow::anyhow!("serialize message failed: {e}"))
})?;
if let Some(locked_until) = msg.locked_until {
if locked_until > now {
moved_locked.push((new_key.clone(), locked_until));
}
}
data.insert(new_key, new_value);
data.remove(&key);
}
let dlq_prefix_from = format!("_dlq/{}/", from);
let dlq_entries: Vec<(String, Vec<u8>)> = data
.range(dlq_prefix_from.clone()..)
.take_while(|(k, _)| k.starts_with(&dlq_prefix_from))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (key, value) in dlq_entries {
let suffix = key
.strip_prefix(&dlq_prefix_from)
.unwrap_or_default()
.to_string();
let mut msg: Message = serde_json::from_slice(&value).map_err(|e| {
RenameQueueError::Storage(anyhow::anyhow!("deserialize DLQ message failed: {e}"))
})?;
msg.queue = to.to_string();
let new_value = serde_json::to_vec(&msg).map_err(|e| {
RenameQueueError::Storage(anyhow::anyhow!("serialize DLQ message failed: {e}"))
})?;
let new_key = format!("_dlq/{}/{}", to, suffix);
data.insert(new_key, new_value);
data.remove(&key);
}
let config_key_from = format!("_queue_config/{}", from);
let config_key_to = format!("_queue_config/{}", to);
let config_json = serde_json::to_vec(&existing_config).map_err(|e| {
RenameQueueError::Storage(anyhow::anyhow!("serialize config failed: {e}"))
})?;
data.remove(&config_key_from);
data.insert(config_key_to, config_json);
drop(data);
{
let mut configs = self.queue_configs.write().await;
configs.remove(from);
configs.insert(to.to_string(), existing_config);
}
{
let prefix_from = format!("{}/", from);
let mut index = self.locked_index.write().await;
index.retain(|k, _| !k.starts_with(&prefix_from));
for (key, until) in moved_locked {
index.insert(key, until);
}
}
{
let mut sets = self.payload_sets.write().await;
if let Some(set) = sets.remove(from) {
sets.insert(to.to_string(), set);
}
}
Ok(())
}
async fn push(&self, msg: Message) -> Result<String> {
let config = self.get_queue_config(&msg.queue).await;
if msg.priority.kind() != config.priority_kind {
return Err(anyhow::anyhow!(
"Priority kind mismatch: queue expects {:?} but got {:?}",
config.priority_kind,
msg.priority.kind()
));
}
if let Priority::Text(ref s) = msg.priority {
if s.is_empty() {
return Err(anyhow::anyhow!("Text priority must not be empty"));
}
}
if !config.allow_duplicates {
let payload_hash = hash_payload(&msg.payload);
let mut data = self.data.write().await;
let mut sets = self.payload_sets.write().await;
let set = sets.entry(msg.queue.clone()).or_default();
if set.contains(&payload_hash) {
return Err(anyhow::anyhow!("Duplicate payload rejected"));
}
let key = self.generate_message_key(&msg, &config);
let value = serde_json::to_vec(&msg)?;
data.insert(key, value);
set.insert(payload_hash);
return Ok(msg.id);
}
let key = self.generate_message_key(&msg, &config);
let value = serde_json::to_vec(&msg)?;
self.data.write().await.insert(key, value);
Ok(msg.id)
}
async fn pop(
&self,
queue: &str,
consumer_id: &str,
timeout_secs: u64,
) -> Result<Option<Message>> {
let prefix = format!("{}/", queue);
let now = Utc::now();
let lock_until = now + chrono::Duration::seconds(timeout_secs as i64);
let mut data = self.data.write().await;
let found = {
let mut result = None;
for (k, v) in data.range(prefix.clone()..) {
if !k.starts_with(&prefix) {
break;
}
let msg: Message = serde_json::from_slice(v)?;
if let Some(locked_until) = msg.locked_until {
if locked_until > now {
continue;
}
}
result = Some((k.clone(), msg));
break;
}
result
};
if let Some((key, mut msg)) = found {
msg.locked_until = Some(lock_until);
msg.locked_by = Some(consumer_id.to_string());
msg.retry_count += 1;
let new_value = serde_json::to_vec(&msg)?;
data.insert(key.clone(), new_value);
self.locked_index.write().await.insert(key, lock_until);
Ok(Some(msg))
} else {
Ok(None)
}
}
async fn ack(&self, queue: &str, message_id: &str, consumer_id: &str) -> Result<bool> {
let prefix = format!("{}/", queue);
let mut data = self.data.write().await;
let found = {
let mut result = None;
for (k, v) in data.range(prefix.clone()..) {
if !k.starts_with(&prefix) {
break;
}
let msg: Message = serde_json::from_slice(v)?;
if msg.id == message_id && msg.locked_by.as_deref() == Some(consumer_id) {
result = Some((k.clone(), msg));
break;
}
}
result
};
if let Some((key, msg)) = found {
data.remove(&key);
self.locked_index.write().await.remove(&key);
let mut sets = self.payload_sets.write().await;
if let Some(set) = sets.get_mut(queue) {
set.remove(&hash_payload(&msg.payload));
}
Ok(true)
} else {
Ok(false)
}
}
async fn nack(&self, queue: &str, message_id: &str, consumer_id: &str) -> Result<bool> {
let prefix = format!("{}/", queue);
let mut data = self.data.write().await;
let found = {
let mut result = None;
for (k, v) in data.range(prefix.clone()..) {
if !k.starts_with(&prefix) {
break;
}
let msg: Message = serde_json::from_slice(v)?;
if msg.id == message_id && msg.locked_by.as_deref() == Some(consumer_id) {
result = Some((k.clone(), msg));
break;
}
}
result
};
if let Some((key, mut msg)) = found {
if msg.retry_count >= msg.max_retries {
let dlq_key = format!("_dlq/{}/{}", queue, msg.id);
let original_value = data.get(&key).cloned().unwrap_or_default();
data.insert(dlq_key, original_value);
data.remove(&key);
let mut sets = self.payload_sets.write().await;
if let Some(set) = sets.get_mut(queue) {
set.remove(&hash_payload(&msg.payload));
}
} else {
msg.locked_until = None;
msg.locked_by = None;
let new_value = serde_json::to_vec(&msg)?;
data.insert(key.clone(), new_value);
}
self.locked_index.write().await.remove(&key);
Ok(true)
} else {
Ok(false)
}
}
async fn renew(
&self,
queue: &str,
message_id: &str,
consumer_id: &str,
timeout_secs: u64,
) -> Result<bool> {
let prefix = format!("{}/", queue);
let now = Utc::now();
let new_expiry = now + chrono::Duration::seconds(timeout_secs as i64);
let mut data = self.data.write().await;
let found = {
let mut result = None;
for (k, v) in data.range(prefix.clone()..) {
if !k.starts_with(&prefix) {
break;
}
let msg: Message = serde_json::from_slice(v)?;
if msg.id == message_id && msg.locked_by.as_deref() == Some(consumer_id) {
result = Some((k.clone(), msg));
break;
}
}
result
};
if let Some((key, mut msg)) = found {
msg.locked_until = Some(new_expiry);
let new_value = serde_json::to_vec(&msg)?;
data.insert(key.clone(), new_value);
self.locked_index.write().await.insert(key, new_expiry);
Ok(true)
} else {
Ok(false)
}
}
async fn batch_ack(
&self,
queue: &str,
consumer_id: &str,
message_ids: &[String],
) -> Result<BatchAckResult> {
if message_ids.is_empty() {
return Ok(BatchAckResult::default());
}
let ids_to_find: HashSet<&str> = message_ids.iter().map(String::as_str).collect();
let prefix = format!("{}/", queue);
let mut data = self.data.write().await;
let mut acked: Vec<String> = Vec::new();
let mut acked_payloads: Vec<[u8; 16]> = Vec::new();
let mut keys_to_remove: Vec<String> = Vec::new();
let entries: Vec<(String, Vec<u8>)> = data
.range(prefix.clone()..)
.take_while(|(k, _)| k.starts_with(&prefix))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (key, value) in entries {
let msg: Message = serde_json::from_slice(&value)?;
if ids_to_find.contains(msg.id.as_str())
&& msg.locked_by.as_deref() == Some(consumer_id)
{
acked.push(msg.id.clone());
acked_payloads.push(hash_payload(&msg.payload));
keys_to_remove.push(key);
}
}
for key in &keys_to_remove {
data.remove(key);
}
drop(data);
if !keys_to_remove.is_empty() {
let mut locked_index = self.locked_index.write().await;
for key in &keys_to_remove {
locked_index.remove(key);
}
}
if !acked_payloads.is_empty() {
let mut sets = self.payload_sets.write().await;
if let Some(set) = sets.get_mut(queue) {
for payload_hash in &acked_payloads {
set.remove(payload_hash);
}
}
}
let acked_set: HashSet<&str> = acked.iter().map(String::as_str).collect();
let not_found = message_ids
.iter()
.filter(|id| !acked_set.contains(id.as_str()))
.cloned()
.collect();
Ok(BatchAckResult { acked, not_found })
}
async fn batch_nack(
&self,
queue: &str,
consumer_id: &str,
message_ids: &[String],
) -> Result<BatchNackResult> {
if message_ids.is_empty() {
return Ok(BatchNackResult::default());
}
let ids_to_find: HashSet<&str> = message_ids.iter().map(String::as_str).collect();
let prefix = format!("{}/", queue);
let mut data = self.data.write().await;
struct Found {
key: String,
msg: Message,
}
let mut targets: Vec<Found> = Vec::new();
let entries: Vec<(String, Vec<u8>)> = data
.range(prefix.clone()..)
.take_while(|(k, _)| k.starts_with(&prefix))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (key, value) in entries {
let msg: Message = serde_json::from_slice(&value)?;
if ids_to_find.contains(msg.id.as_str())
&& msg.locked_by.as_deref() == Some(consumer_id)
{
targets.push(Found { key, msg });
}
}
let mut result = BatchNackResult::default();
let mut index_keys: Vec<String> = Vec::new();
let mut dlq_payloads: Vec<[u8; 16]> = Vec::new();
for Found { key, mut msg } in targets {
if msg.retry_count >= msg.max_retries {
let dlq_key = format!("_dlq/{}/{}", queue, msg.id);
let value = data.get(&key).cloned().unwrap_or_default();
data.insert(dlq_key, value);
data.remove(&key);
result.dead_lettered.push(msg.id.clone());
dlq_payloads.push(hash_payload(&msg.payload));
} else {
msg.locked_until = None;
msg.locked_by = None;
let new_value = serde_json::to_vec(&msg)?;
data.insert(key.clone(), new_value);
result.unlocked.push(msg.id.clone());
}
index_keys.push(key);
}
let processed: HashSet<&str> = result
.unlocked
.iter()
.chain(result.dead_lettered.iter())
.chain(result.dropped.iter())
.map(String::as_str)
.collect();
result.not_found = message_ids
.iter()
.filter(|id| !processed.contains(id.as_str()))
.cloned()
.collect();
drop(data);
if !index_keys.is_empty() {
let mut locked_index = self.locked_index.write().await;
for key in index_keys {
locked_index.remove(&key);
}
}
if !dlq_payloads.is_empty() {
let mut sets = self.payload_sets.write().await;
if let Some(set) = sets.get_mut(queue) {
for payload_hash in dlq_payloads {
set.remove(&payload_hash);
}
}
}
Ok(result)
}
async fn delete_queue(&self, queue_name: &str) -> Result<usize> {
let mut data = self.data.write().await;
let prefix = format!("{}/", queue_name);
let keys: Vec<String> = data
.range(prefix.clone()..)
.take_while(|(k, _)| k.starts_with(&prefix))
.map(|(k, _)| k.clone())
.collect();
let deleted_count = keys.len();
for key in &keys {
data.remove(key);
}
let dlq_prefix = format!("_dlq/{}/", queue_name);
let dlq_keys: Vec<String> = data
.range(dlq_prefix.clone()..)
.take_while(|(k, _)| k.starts_with(&dlq_prefix))
.map(|(k, _)| k.clone())
.collect();
for key in &dlq_keys {
data.remove(key);
}
let config_key = format!("_queue_config/{}", queue_name);
data.remove(&config_key);
drop(data);
if deleted_count > 0 {
let mut locked_index = self.locked_index.write().await;
for key in &keys {
locked_index.remove(key);
}
}
self.queue_configs.write().await.remove(queue_name);
self.payload_sets.write().await.remove(queue_name);
Ok(deleted_count)
}
async fn purge_queue(&self, queue_name: &str) -> Result<usize> {
let mut data = self.data.write().await;
let prefix = format!("{}/", queue_name);
let keys: Vec<String> = data
.range(prefix.clone()..)
.take_while(|(k, _)| k.starts_with(&prefix))
.map(|(k, _)| k.clone())
.collect();
let purged_count = keys.len();
for key in &keys {
data.remove(key);
}
drop(data);
if purged_count > 0 {
let mut locked_index = self.locked_index.write().await;
for key in &keys {
locked_index.remove(key);
}
}
let mut sets = self.payload_sets.write().await;
if let Some(set) = sets.get_mut(queue_name) {
set.clear();
}
Ok(purged_count)
}
async fn get_all_queue_stats(&self) -> Result<Vec<QueueStats>> {
let data = self.data.read().await;
let now = Utc::now();
let mut queue_stats: HashMap<String, (usize, usize)> = HashMap::new();
for (key, value) in data.iter() {
if key.starts_with("_dlq/") || key.starts_with("_queue_config/") {
continue;
}
let parts: Vec<&str> = key.split('/').collect();
if parts.len() < 4 {
continue;
}
let queue_name = parts[0].to_string();
match serde_json::from_slice::<Message>(value) {
Ok(msg) => {
let is_locked = msg.locked_until.is_some_and(|lu| lu > now);
let entry = queue_stats.entry(queue_name).or_insert((0, 0));
if is_locked {
entry.1 += 1;
} else {
entry.0 += 1;
}
}
Err(e) => {
tracing::warn!("Failed to deserialize message for key {}: {}", key, e);
}
}
}
drop(data);
let configs = self.queue_configs.read().await;
for queue_name in configs.keys() {
queue_stats.entry(queue_name.clone()).or_insert((0, 0));
}
let mut stats = Vec::new();
for (queue_name, (available, locked)) in queue_stats {
let config = configs.get(&queue_name).cloned().unwrap_or_default();
stats.push(QueueStats {
name: queue_name,
available,
locked,
total: available + locked,
config,
});
}
stats.sort_by(|a, b| a.name.cmp(&b.name));
Ok(stats)
}
async fn get_queue_stats(&self, queue_name: &str) -> Result<QueueStats> {
let data = self.data.read().await;
let now = Utc::now();
let prefix = format!("{}/", queue_name);
let mut available = 0;
let mut locked = 0;
for (k, v) in data.range(prefix.clone()..) {
if !k.starts_with(&prefix) {
break;
}
match serde_json::from_slice::<Message>(v) {
Ok(msg) => {
if msg.locked_until.is_some_and(|lu| lu > now) {
locked += 1;
} else {
available += 1;
}
}
Err(e) => {
tracing::warn!("Failed to deserialize message for key {}: {}", k, e);
}
}
}
drop(data);
Ok(QueueStats {
name: queue_name.to_string(),
available,
locked,
total: available + locked,
config: self.get_queue_config(queue_name).await,
})
}
async fn list_queues(&self) -> Result<Vec<String>> {
let data = self.data.read().await;
let mut queue_names: HashSet<String> = HashSet::new();
for (key, _) in data.iter() {
if key.starts_with("_dlq/") || key.starts_with("_queue_config/") {
continue;
}
let parts: Vec<&str> = key.split('/').collect();
if parts.len() >= 4 {
queue_names.insert(parts[0].to_string());
}
}
let mut queues: Vec<String> = queue_names.into_iter().collect();
queues.sort();
Ok(queues)
}
async fn unlock_expired_messages(&self) -> Result<usize> {
let now = Utc::now();
let mut expired_keys = Vec::new();
{
let locked_index = self.locked_index.read().await;
for (key, lock_until) in locked_index.iter() {
if *lock_until <= now {
expired_keys.push(key.clone());
}
}
}
let mut unlocked_count = 0;
for key in &expired_keys {
match self.unlock_message_by_key(key).await {
Ok(true) => unlocked_count += 1,
Ok(false) => {}
Err(e) => {
tracing::error!("Failed to unlock message {}: {}", key, e);
}
}
}
if !expired_keys.is_empty() {
let mut locked_index = self.locked_index.write().await;
for key in expired_keys {
locked_index.remove(&key);
}
}
Ok(unlocked_count)
}
async fn force_unlock_queue(&self, queue_name: &str) -> Result<usize> {
let prefix = format!("{}/", queue_name);
let keys_to_unlock: Vec<String> = {
let data = self.data.read().await;
data.range(prefix.clone()..)
.take_while(|(k, _)| k.starts_with(&prefix))
.filter_map(|(k, v)| {
serde_json::from_slice::<Message>(v)
.ok()
.filter(|m| m.locked_until.is_some())
.map(|_| k.clone())
})
.collect()
};
let mut unlocked = 0usize;
for key in &keys_to_unlock {
if self.unlock_message_by_key(key).await.unwrap_or(false) {
unlocked += 1;
}
}
if !keys_to_unlock.is_empty() {
let mut locked_index = self.locked_index.write().await;
for key in &keys_to_unlock {
locked_index.remove(key);
}
}
Ok(unlocked)
}
}