crb_superagent/interplay/
subscription.rs1use super::{Fetcher, Interplay};
2use anyhow::{Result, anyhow};
3use async_trait::async_trait;
4use crb_agent::{Address, Agent, Context, MessageFor};
5use crb_core::Unique;
6use crb_send::{Recipient, Sender};
7
8pub trait SubscribeExt<S: Subscription> {
9 fn subscribe(&self, request: S) -> Fetcher<StateEntry<S>>;
10}
11
12impl<A, S> SubscribeExt<S> for Address<A>
13where
14 A: ManageSubscription<S>,
15 S: Subscription,
16{
17 fn subscribe(&self, subscription: S) -> Fetcher<StateEntry<S>> {
18 let sub_id = Unique::new(subscription);
19 let (interplay, fetcher) = Interplay::new_pair(sub_id);
20 let msg = Subscribe { interplay };
21 let res = self.send(msg);
22 fetcher.grasp(res)
23 }
24}
25
26impl<A, S> SubscribeExt<S> for Context<A>
27where
28 A: ManageSubscription<S>,
29 S: Subscription,
30{
31 fn subscribe(&self, subscription: S) -> Fetcher<StateEntry<S>> {
32 self.address().subscribe(subscription)
33 }
34}
35
36#[must_use]
37pub struct StateEntry<S: Subscription> {
38 pub state: S::State,
39 pub entry: Entry<S>,
40}
41
42pub struct Entry<S: Subscription> {
43 sub_id: Unique<S>,
44 recipient: Recipient<Unsubscribe<S>>,
45}
46
47impl<S: Subscription> Drop for Entry<S> {
48 fn drop(&mut self) {
49 let msg = Unsubscribe {
50 sub_id: self.sub_id.clone(),
51 };
52 self.recipient.send(msg).ok();
53 }
54}
55
56pub trait Subscription: Sync + Send + 'static {
57 type State: Send + 'static;
58}
59
60#[async_trait]
61pub trait ManageSubscription<S: Subscription>: Agent {
62 async fn handle(&mut self, msg: Subscribe<S>, ctx: &mut Context<Self>) -> Result<()> {
63 let sub_id = msg.interplay.request;
64 let res = self.subscribe(sub_id.clone(), ctx).await;
65 let state_entry = match res {
66 Ok(state) => {
67 let recipient = ctx.address().sender();
68 let entry = Entry { sub_id, recipient };
69 let state_entry = StateEntry { state, entry };
70 Ok(state_entry)
71 }
72 Err(err) => Err(err),
73 };
74 msg.interplay.responder.send_result(state_entry)
75 }
76
77 async fn subscribe(&mut self, sub_id: Unique<S>, _ctx: &mut Context<Self>) -> Result<S::State> {
78 Err(anyhow!(
79 "The on_subscribe method in not implemented to handle {sub_id}."
80 ))
81 }
82
83 async fn unsubscribe(&mut self, sub_id: Unique<S>, _ctx: &mut Context<Self>) -> Result<()> {
84 Err(anyhow!(
85 "The on_unsubscribe method in not implemented to handle {sub_id}"
86 ))
87 }
88}
89
90pub struct Subscribe<S: Subscription> {
91 pub interplay: Interplay<Unique<S>, StateEntry<S>>,
92}
93
94#[async_trait]
95impl<A, S> MessageFor<A> for Subscribe<S>
96where
97 A: ManageSubscription<S>,
98 S: Subscription,
99{
100 async fn handle(self: Box<Self>, agent: &mut A, ctx: &mut Context<A>) -> Result<()> {
101 agent.handle(*self, ctx).await
102 }
103}
104
105pub struct Unsubscribe<S: Subscription> {
106 pub sub_id: Unique<S>,
107}
108
109#[async_trait]
110impl<A, S> MessageFor<A> for Unsubscribe<S>
111where
112 A: ManageSubscription<S>,
113 S: Subscription,
114{
115 async fn handle(self: Box<Self>, agent: &mut A, ctx: &mut Context<A>) -> Result<()> {
116 agent.unsubscribe(self.sub_id, ctx).await
117 }
118}