use super::*;
pub(super) fn handle_blocking_pop(
key: &str,
waiter: mpsc::Sender<(String, Bytes)>,
is_left: bool,
reply: ReplySender,
ctx: &mut ProcessCtx<'_>,
) {
let fsync_policy = ctx.fsync_policy;
let shard_id = ctx.shard_id;
let result = if is_left {
ctx.keyspace.lpop(key)
} else {
ctx.keyspace.rpop(key)
};
match result {
Ok(Some(data)) => {
let _ = waiter.try_send((key.to_owned(), data));
reply.send(ShardResponse::Ok);
let record = if is_left {
AofRecord::LPop {
key: key.to_owned(),
}
} else {
AofRecord::RPop {
key: key.to_owned(),
}
};
aof::write_aof_record(
&record,
ctx.aof_writer,
fsync_policy,
shard_id,
ctx.aof_errors,
ctx.disk_full,
);
aof::broadcast_replication(
record,
ctx.replication_tx,
ctx.replication_offset,
shard_id,
);
}
Ok(None) => {
let map = if is_left {
&mut *ctx.lpop_waiters
} else {
&mut *ctx.rpop_waiters
};
map.entry(key.to_owned()).or_default().push_back(waiter);
drop(reply);
}
Err(_) => {
reply.send(ShardResponse::WrongType);
}
}
}
pub(super) fn wake_blocked_waiters(key: &str, ctx: &mut ProcessCtx<'_>) {
let fsync_policy = ctx.fsync_policy;
let shard_id = ctx.shard_id;
for is_left in [true, false] {
let map = if is_left {
&mut *ctx.lpop_waiters
} else {
&mut *ctx.rpop_waiters
};
if let Some(waiters) = map.get_mut(key) {
while let Some(waiter) = waiters.pop_front() {
if waiter.is_closed() {
continue;
}
let result = if is_left {
ctx.keyspace.lpop(key)
} else {
ctx.keyspace.rpop(key)
};
match result {
Ok(Some(data)) => {
let _ = waiter.try_send((key.to_owned(), data));
let record = if is_left {
AofRecord::LPop {
key: key.to_owned(),
}
} else {
AofRecord::RPop {
key: key.to_owned(),
}
};
aof::write_aof_record(
&record,
ctx.aof_writer,
fsync_policy,
shard_id,
ctx.aof_errors,
ctx.disk_full,
);
aof::broadcast_replication(
record,
ctx.replication_tx,
ctx.replication_offset,
shard_id,
);
}
_ => break, }
}
if waiters.is_empty() {
map.remove(key);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn blpop_immediate_when_list_has_data() {
let handle = spawn_shard(
16,
ShardConfig::default(),
None,
None,
None,
#[cfg(feature = "protobuf")]
None,
);
let resp = handle
.send(ShardRequest::LPush {
key: "mylist".into(),
values: vec![Bytes::from("hello")],
})
.await
.unwrap();
assert!(matches!(resp, ShardResponse::Len(1)));
let (tx, mut rx) = mpsc::channel(1);
let _ = handle
.dispatch(ShardRequest::BLPop {
key: "mylist".into(),
waiter: tx,
})
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
let result = rx.try_recv();
assert!(result.is_ok());
let (key, data) = result.unwrap();
assert_eq!(key, "mylist");
assert_eq!(data, Bytes::from("hello"));
}
#[tokio::test]
async fn blpop_blocks_then_wakes_on_push() {
let handle = spawn_shard(
16,
ShardConfig::default(),
None,
None,
None,
#[cfg(feature = "protobuf")]
None,
);
let (tx, mut rx) = mpsc::channel(1);
let _ = handle
.dispatch(ShardRequest::BLPop {
key: "q".into(),
waiter: tx,
})
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(rx.try_recv().is_err());
let resp = handle
.send(ShardRequest::LPush {
key: "q".into(),
values: vec![Bytes::from("task1")],
})
.await
.unwrap();
assert!(matches!(resp, ShardResponse::Len(1)));
let result = tokio::time::timeout(Duration::from_secs(1), rx.recv()).await;
assert!(result.is_ok());
let (key, data) = result.unwrap().unwrap();
assert_eq!(key, "q");
assert_eq!(data, Bytes::from("task1"));
}
#[tokio::test]
async fn brpop_immediate_when_list_has_data() {
let handle = spawn_shard(
16,
ShardConfig::default(),
None,
None,
None,
#[cfg(feature = "protobuf")]
None,
);
handle
.send(ShardRequest::RPush {
key: "mylist".into(),
values: vec![Bytes::from("a"), Bytes::from("b")],
})
.await
.unwrap();
let (tx, mut rx) = mpsc::channel(1);
let _ = handle
.dispatch(ShardRequest::BRPop {
key: "mylist".into(),
waiter: tx,
})
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
let (key, data) = rx.try_recv().unwrap();
assert_eq!(key, "mylist");
assert_eq!(data, Bytes::from("b"));
}
#[tokio::test]
async fn blpop_waiter_dropped_on_timeout() {
let handle = spawn_shard(
16,
ShardConfig::default(),
None,
None,
None,
#[cfg(feature = "protobuf")]
None,
);
let (tx, rx) = mpsc::channel(1);
let _ = handle
.dispatch(ShardRequest::BLPop {
key: "q".into(),
waiter: tx,
})
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
drop(rx);
let resp = handle
.send(ShardRequest::LPush {
key: "q".into(),
values: vec![Bytes::from("data")],
})
.await
.unwrap();
assert!(matches!(resp, ShardResponse::Len(1)));
}
#[tokio::test]
async fn blpop_dead_waiter_cleanup() {
let handle = spawn_shard(
16,
ShardConfig::default(),
None,
None,
None,
#[cfg(feature = "protobuf")]
None,
);
let (tx, rx) = mpsc::channel(1);
let _ = handle
.dispatch(ShardRequest::BLPop {
key: "cleanup".into(),
waiter: tx,
})
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
drop(rx);
let resp = handle
.send(ShardRequest::LPush {
key: "cleanup".into(),
values: vec![Bytes::from("x")],
})
.await
.unwrap();
assert!(matches!(resp, ShardResponse::Len(1)));
}
}