crb_superagent/interplay/
subscription.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use super::{Fetcher, Interplay};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use crb_agent::{Address, Agent, Context, MessageFor};
use crb_core::UniqueId;
use crb_send::{Recipient, Sender};

pub trait SubscribeExt<S: Subscription> {
    fn subscribe(&self, request: S) -> Fetcher<StateEntry<S>>;
}

impl<A, S> SubscribeExt<S> for Address<A>
where
    A: ManageSubscription<S>,
    S: Subscription,
{
    fn subscribe(&self, subscription: S) -> Fetcher<StateEntry<S>> {
        let sub_id = UniqueId::new(subscription);
        let (interplay, fetcher) = Interplay::new_pair(sub_id);
        let msg = Subscribe { interplay };
        let res = self.send(msg);
        let fetcher = fetcher.grasp(res);
        fetcher
    }
}

impl<A, S> SubscribeExt<S> for Context<A>
where
    A: ManageSubscription<S>,
    S: Subscription,
{
    fn subscribe(&self, subscription: S) -> Fetcher<StateEntry<S>> {
        self.address().subscribe(subscription)
    }
}

#[must_use]
pub struct StateEntry<S: Subscription> {
    pub state: S::State,
    pub entry: Entry<S>,
}

pub struct Entry<S: Subscription> {
    sub_id: UniqueId<S>,
    recipient: Recipient<Unsubscribe<S>>,
}

impl<S: Subscription> Drop for Entry<S> {
    fn drop(&mut self) {
        let msg = Unsubscribe {
            sub_id: self.sub_id.clone(),
        };
        self.recipient.send(msg).ok();
    }
}

pub trait Subscription: Sync + Send + 'static {
    type State: Send + 'static;
}

#[async_trait]
pub trait ManageSubscription<S: Subscription>: Agent {
    async fn handle(&mut self, msg: Subscribe<S>, ctx: &mut Context<Self>) -> Result<()> {
        let sub_id = msg.interplay.request;
        let res = self.subscribe(sub_id.clone(), ctx).await;
        let state_entry = match res {
            Ok(state) => {
                let recipient = ctx.address().sender();
                let entry = Entry { sub_id, recipient };
                let state_entry = StateEntry { state, entry };
                Ok(state_entry)
            }
            Err(err) => Err(err),
        };
        msg.interplay.responder.send_result(state_entry)
    }

    async fn subscribe(
        &mut self,
        sub_id: UniqueId<S>,
        _ctx: &mut Context<Self>,
    ) -> Result<S::State> {
        Err(anyhow!(
            "The on_subscribe method in not implemented to handle {sub_id}."
        ))
    }

    async fn unsubscribe(&mut self, sub_id: UniqueId<S>, _ctx: &mut Context<Self>) -> Result<()> {
        Err(anyhow!(
            "The on_unsubscribe method in not implemented to handle {sub_id}"
        ))
    }
}

pub struct Subscribe<S: Subscription> {
    pub interplay: Interplay<UniqueId<S>, StateEntry<S>>,
}

#[async_trait]
impl<A, S> MessageFor<A> for Subscribe<S>
where
    A: ManageSubscription<S>,
    S: Subscription,
{
    async fn handle(self: Box<Self>, agent: &mut A, ctx: &mut Context<A>) -> Result<()> {
        agent.handle(*self, ctx).await
    }
}

pub struct Unsubscribe<S: Subscription> {
    pub sub_id: UniqueId<S>,
}

#[async_trait]
impl<A, S> MessageFor<A> for Unsubscribe<S>
where
    A: ManageSubscription<S>,
    S: Subscription,
{
    async fn handle(self: Box<Self>, agent: &mut A, ctx: &mut Context<A>) -> Result<()> {
        agent.unsubscribe(self.sub_id, ctx).await
    }
}