rill_client/actors/client/
actor.rs

1mod wait_ready;
2
3use anyhow::Error;
4use async_trait::async_trait;
5use derive_more::From;
6use meio::{
7    ActionHandler, Actor, Address, Context, IdOf, InstantActionHandler, InterruptedBy, StartedBy,
8    TaskEliminated, TaskError,
9};
10use meio_connect::{
11    client::{WsClient, WsClientStatus, WsSender},
12    WsIncoming,
13};
14use rill_protocol::io::client::{
15    ClientProtocol, ClientRequest, ClientResponse, ClientServiceRequest, ClientServiceResponse,
16};
17use rill_protocol::io::transport::ServiceEnvelope;
18use std::collections::VecDeque;
19use std::time::Duration;
20
21type WsOutgoing = WsSender<ServiceEnvelope<ClientProtocol, ClientRequest, ClientServiceResponse>>;
22
23#[derive(From)]
24pub struct RillClientLink {
25    address: Address<RillClient>,
26}
27
28pub struct RillClient {
29    url: String,
30    sender: Option<WsOutgoing>,
31    awaiting_clients: VecDeque<wait_ready::Notifier>,
32}
33
34impl RillClient {
35    pub fn new(url: Option<String>) -> Self {
36        let url = url.unwrap_or_else(|| "http://localhost:1636".into());
37        Self {
38            url,
39            sender: None,
40            awaiting_clients: VecDeque::new(),
41        }
42    }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Hash)]
46pub enum Group {
47    WsConnection,
48}
49
50impl Actor for RillClient {
51    type GroupBy = Group;
52}
53
54#[async_trait]
55impl<T: Actor> StartedBy<T> for RillClient {
56    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
57        // TODO: Use `strum` here
58        ctx.termination_sequence(vec![Group::WsConnection]);
59
60        let client = WsClient::new(
61            self.url.clone(),
62            Some(Duration::from_secs(1)),
63            ctx.address().clone(),
64        );
65        ctx.spawn_task(client, (), Group::WsConnection);
66
67        Ok(())
68    }
69}
70
71#[async_trait]
72impl<T: Actor> InterruptedBy<T> for RillClient {
73    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
74        ctx.shutdown();
75        Ok(())
76    }
77}
78
79#[async_trait]
80impl InstantActionHandler<WsClientStatus<ClientProtocol>> for RillClient {
81    async fn handle(
82        &mut self,
83        status: WsClientStatus<ClientProtocol>,
84        _ctx: &mut Context<Self>,
85    ) -> Result<(), Error> {
86        match status {
87            WsClientStatus::Connected { sender } => {
88                self.sender = Some(sender);
89                self.notify_awaiting_clients();
90            }
91            WsClientStatus::Failed { reason } => {
92                log::error!("Connection failed: {}", reason);
93                self.sender.take();
94            }
95        }
96        Ok(())
97    }
98}
99
100#[async_trait]
101impl
102    ActionHandler<WsIncoming<ServiceEnvelope<ClientProtocol, ClientResponse, ClientServiceRequest>>>
103    for RillClient
104{
105    async fn handle(
106        &mut self,
107        msg: WsIncoming<ServiceEnvelope<ClientProtocol, ClientResponse, ClientServiceRequest>>,
108        _ctx: &mut Context<Self>,
109    ) -> Result<(), Error> {
110        log::trace!("Incoming to exporter: {:?}", msg);
111        match msg.0 {
112            ServiceEnvelope::Envelope(envelope) => {
113                let _direct_id = envelope.direct_id;
114                match envelope.data {
115                    ClientResponse::Declare(entry_id) => {
116                        log::info!("Connected to: {}", entry_id);
117                    }
118                    evt => {
119                        log::error!("Not implemented for {:?}", evt);
120                    }
121                }
122            }
123            ServiceEnvelope::Service(_) => {
124                log::error!("Service message is not supported yet.");
125            }
126        }
127        Ok(())
128    }
129}
130
131#[async_trait]
132impl TaskEliminated<WsClient<ClientProtocol, Self>, ()> for RillClient {
133    async fn handle(
134        &mut self,
135        _id: IdOf<WsClient<ClientProtocol, Self>>,
136        _tag: (),
137        _result: Result<(), TaskError>,
138        _ctx: &mut Context<Self>,
139    ) -> Result<(), Error> {
140        // TODO: Drop unfinished tasks
141        Ok(())
142    }
143}