crb_superagent/interplay/
subscription.rs

1use super::{Fetcher, Interplay};
2use anyhow::{Result, anyhow};
3use async_trait::async_trait;
4use crb_agent::{Address, Agent, Context, MessageFor};
5use crb_core::Unique;
6use crb_send::{Recipient, Sender};
7
8pub trait SubscribeExt<S: Subscription> {
9    fn subscribe(&self, request: S) -> Fetcher<StateEntry<S>>;
10}
11
12impl<A, S> SubscribeExt<S> for Address<A>
13where
14    A: ManageSubscription<S>,
15    S: Subscription,
16{
17    fn subscribe(&self, subscription: S) -> Fetcher<StateEntry<S>> {
18        let sub_id = Unique::new(subscription);
19        let (interplay, fetcher) = Interplay::new_pair(sub_id);
20        let msg = Subscribe { interplay };
21        let res = self.send(msg);
22        fetcher.grasp(res)
23    }
24}
25
26impl<A, S> SubscribeExt<S> for Context<A>
27where
28    A: ManageSubscription<S>,
29    S: Subscription,
30{
31    fn subscribe(&self, subscription: S) -> Fetcher<StateEntry<S>> {
32        self.address().subscribe(subscription)
33    }
34}
35
36#[must_use]
37pub struct StateEntry<S: Subscription> {
38    pub state: S::State,
39    pub entry: Entry<S>,
40}
41
42pub struct Entry<S: Subscription> {
43    sub_id: Unique<S>,
44    recipient: Recipient<Unsubscribe<S>>,
45}
46
47impl<S: Subscription> Drop for Entry<S> {
48    fn drop(&mut self) {
49        let msg = Unsubscribe {
50            sub_id: self.sub_id.clone(),
51        };
52        self.recipient.send(msg).ok();
53    }
54}
55
56pub trait Subscription: Sync + Send + 'static {
57    type State: Send + 'static;
58}
59
60#[async_trait]
61pub trait ManageSubscription<S: Subscription>: Agent {
62    async fn handle(&mut self, msg: Subscribe<S>, ctx: &mut Context<Self>) -> Result<()> {
63        let sub_id = msg.interplay.request;
64        let res = self.subscribe(sub_id.clone(), ctx).await;
65        let state_entry = match res {
66            Ok(state) => {
67                let recipient = ctx.address().sender();
68                let entry = Entry { sub_id, recipient };
69                let state_entry = StateEntry { state, entry };
70                Ok(state_entry)
71            }
72            Err(err) => Err(err),
73        };
74        msg.interplay.responder.send_result(state_entry)
75    }
76
77    async fn subscribe(&mut self, sub_id: Unique<S>, _ctx: &mut Context<Self>) -> Result<S::State> {
78        Err(anyhow!(
79            "The on_subscribe method in not implemented to handle {sub_id}."
80        ))
81    }
82
83    async fn unsubscribe(&mut self, sub_id: Unique<S>, _ctx: &mut Context<Self>) -> Result<()> {
84        Err(anyhow!(
85            "The on_unsubscribe method in not implemented to handle {sub_id}"
86        ))
87    }
88}
89
90pub struct Subscribe<S: Subscription> {
91    pub interplay: Interplay<Unique<S>, StateEntry<S>>,
92}
93
94#[async_trait]
95impl<A, S> MessageFor<A> for Subscribe<S>
96where
97    A: ManageSubscription<S>,
98    S: Subscription,
99{
100    async fn handle(self: Box<Self>, agent: &mut A, ctx: &mut Context<A>) -> Result<()> {
101        agent.handle(*self, ctx).await
102    }
103}
104
105pub struct Unsubscribe<S: Subscription> {
106    pub sub_id: Unique<S>,
107}
108
109#[async_trait]
110impl<A, S> MessageFor<A> for Unsubscribe<S>
111where
112    A: ManageSubscription<S>,
113    S: Subscription,
114{
115    async fn handle(self: Box<Self>, agent: &mut A, ctx: &mut Context<A>) -> Result<()> {
116        agent.unsubscribe(self.sub_id, ctx).await
117    }
118}