rill_client/actors/client/
actor.rs1mod wait_ready;
2
3use anyhow::Error;
4use async_trait::async_trait;
5use derive_more::From;
6use meio::{
7 ActionHandler, Actor, Address, Context, IdOf, InstantActionHandler, InterruptedBy, StartedBy,
8 TaskEliminated, TaskError,
9};
10use meio_connect::{
11 client::{WsClient, WsClientStatus, WsSender},
12 WsIncoming,
13};
14use rill_protocol::io::client::{
15 ClientProtocol, ClientRequest, ClientResponse, ClientServiceRequest, ClientServiceResponse,
16};
17use rill_protocol::io::transport::ServiceEnvelope;
18use std::collections::VecDeque;
19use std::time::Duration;
20
21type WsOutgoing = WsSender<ServiceEnvelope<ClientProtocol, ClientRequest, ClientServiceResponse>>;
22
23#[derive(From)]
24pub struct RillClientLink {
25 address: Address<RillClient>,
26}
27
28pub struct RillClient {
29 url: String,
30 sender: Option<WsOutgoing>,
31 awaiting_clients: VecDeque<wait_ready::Notifier>,
32}
33
34impl RillClient {
35 pub fn new(url: Option<String>) -> Self {
36 let url = url.unwrap_or_else(|| "http://localhost:1636".into());
37 Self {
38 url,
39 sender: None,
40 awaiting_clients: VecDeque::new(),
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Hash)]
46pub enum Group {
47 WsConnection,
48}
49
50impl Actor for RillClient {
51 type GroupBy = Group;
52}
53
54#[async_trait]
55impl<T: Actor> StartedBy<T> for RillClient {
56 async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
57 ctx.termination_sequence(vec![Group::WsConnection]);
59
60 let client = WsClient::new(
61 self.url.clone(),
62 Some(Duration::from_secs(1)),
63 ctx.address().clone(),
64 );
65 ctx.spawn_task(client, (), Group::WsConnection);
66
67 Ok(())
68 }
69}
70
71#[async_trait]
72impl<T: Actor> InterruptedBy<T> for RillClient {
73 async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
74 ctx.shutdown();
75 Ok(())
76 }
77}
78
79#[async_trait]
80impl InstantActionHandler<WsClientStatus<ClientProtocol>> for RillClient {
81 async fn handle(
82 &mut self,
83 status: WsClientStatus<ClientProtocol>,
84 _ctx: &mut Context<Self>,
85 ) -> Result<(), Error> {
86 match status {
87 WsClientStatus::Connected { sender } => {
88 self.sender = Some(sender);
89 self.notify_awaiting_clients();
90 }
91 WsClientStatus::Failed { reason } => {
92 log::error!("Connection failed: {}", reason);
93 self.sender.take();
94 }
95 }
96 Ok(())
97 }
98}
99
100#[async_trait]
101impl
102 ActionHandler<WsIncoming<ServiceEnvelope<ClientProtocol, ClientResponse, ClientServiceRequest>>>
103 for RillClient
104{
105 async fn handle(
106 &mut self,
107 msg: WsIncoming<ServiceEnvelope<ClientProtocol, ClientResponse, ClientServiceRequest>>,
108 _ctx: &mut Context<Self>,
109 ) -> Result<(), Error> {
110 log::trace!("Incoming to exporter: {:?}", msg);
111 match msg.0 {
112 ServiceEnvelope::Envelope(envelope) => {
113 let _direct_id = envelope.direct_id;
114 match envelope.data {
115 ClientResponse::Declare(entry_id) => {
116 log::info!("Connected to: {}", entry_id);
117 }
118 evt => {
119 log::error!("Not implemented for {:?}", evt);
120 }
121 }
122 }
123 ServiceEnvelope::Service(_) => {
124 log::error!("Service message is not supported yet.");
125 }
126 }
127 Ok(())
128 }
129}
130
131#[async_trait]
132impl TaskEliminated<WsClient<ClientProtocol, Self>, ()> for RillClient {
133 async fn handle(
134 &mut self,
135 _id: IdOf<WsClient<ClientProtocol, Self>>,
136 _tag: (),
137 _result: Result<(), TaskError>,
138 _ctx: &mut Context<Self>,
139 ) -> Result<(), Error> {
140 Ok(())
142 }
143}