proxy_sdk/queue.rs
1use crate::{check_concern, hostcalls, RootContext, Status};
2
3/// Shared Queues in proxy-wasm are a FIFO MPMC queue with *no message duplication*.
4/// Any WASM VM can resolve a queue or register new ones in their own VM ID.
5/// Any WASM VM can dequeue data, which will globally dequeue that item. Messages are not replicated to each WASM VM.
6/// When broadcasting data to many WASM VMs, it's advised to have a scheme where each thread can register it's own inbound queue, then enqueue the name of said queue to the centralized source of data. That source then enqueues to each WASM VM's queue individually.
7#[derive(Clone, Copy, PartialEq, Eq)]
8pub struct Queue(pub(crate) u32);
9
10impl Queue {
11 /// Registers a new queue under a given name. Names are globally unique underneath a single VM ID.
12 /// Re-registering the same name from *any WASM VM* in the same VM ID will overwrite the previous registration of that name, and is not advised.
13 pub fn register(name: impl AsRef<str>) -> Result<Self, Status> {
14 hostcalls::register_shared_queue(name.as_ref()).map(Self)
15 }
16
17 /// Resolves an existing queue for a given name in the given VM ID.
18 pub fn resolve(vm_id: impl AsRef<str>, name: impl AsRef<str>) -> Result<Option<Self>, Status> {
19 hostcalls::resolve_shared_queue(vm_id.as_ref(), name.as_ref()).map(|x| x.map(Self))
20 }
21
22 /// Remove an item from this queue, if any is present. Returns `Ok(None)` when no data is enqueued.
23 /// Note that this is not VM-local and any message can only be received by one dequeue operation *anywhere*.
24 pub fn dequeue(&self) -> Result<Option<Vec<u8>>, Status> {
25 hostcalls::dequeue_shared_queue(self.0)
26 }
27
28 /// Enqueues a new item into this queue.
29 pub fn enqueue(&self, value: impl AsRef<[u8]>) -> Result<(), Status> {
30 hostcalls::enqueue_shared_queue(self.0, value)
31 }
32
33 /// Registers a callback that is called whenever data is available in the queue to be dequeued.
34 /// Only one of `on_enqueue` or `on_receive` can be set at the same time.
35 pub fn on_enqueue<R: RootContext>(self, callback: impl FnMut(&mut R, Queue) + 'static) -> Self {
36 crate::dispatcher::register_queue_callback(self.0, callback);
37 self
38 }
39
40 /// Registers a callback that is called whenever data is available in the queue to be dequeued.
41 /// Also dequeues anything on the queue. It may call the callback multiple times for each item, if multiple are present.
42 /// Only one of `on_enqueue` or `on_receive` can be set at the same time.
43 pub fn on_receive<R: RootContext>(
44 self,
45 mut callback: impl FnMut(&mut R, Queue, Vec<u8>) + 'static,
46 ) -> Self {
47 crate::dispatcher::register_queue_callback(self.0, move |root, queue| {
48 while let Some(dequeued) = check_concern("queue-receive", queue.dequeue()).flatten() {
49 callback(root, queue, dequeued);
50 }
51 });
52 self
53 }
54}
55
56impl PartialEq<u32> for Queue {
57 fn eq(&self, other: &u32) -> bool {
58 self.0 == *other
59 }
60}
61
62impl PartialEq<Queue> for u32 {
63 fn eq(&self, other: &Queue) -> bool {
64 other == self
65 }
66}