1use std::fmt;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures::TryFutureExt;
6use log::trace;
7
8use tc_error::*;
9use tc_transact::TxnId;
10use tc_value::{Host, ToUrl, Value};
11use tcgeneric::Map;
12
13use crate::kernel::Kernel;
14use crate::{Actor, State, Txn};
15
16pub trait Egress: Send + Sync + fmt::Debug {
17 fn is_authorized(&self, link: &ToUrl<'_>, write: bool) -> bool;
18}
19
20#[async_trait]
21pub trait RPCClient: Send + Sync {
22 fn extract_jwt(&self, txn: &Txn) -> Option<String> {
23 txn.token().map(|token| token.jwt().to_string())
24 }
25
26 async fn fetch(&self, txn_id: TxnId, link: ToUrl<'_>, actor_id: Value) -> TCResult<Actor>;
27
28 async fn get(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<State>;
29
30 async fn put(&self, txn: &Txn, link: ToUrl<'_>, key: Value, value: State) -> TCResult<()>;
31
32 async fn post(&self, txn: &Txn, link: ToUrl<'_>, params: Map<State>) -> TCResult<State>;
33
34 async fn delete(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<()>;
35}
36
37#[derive(Clone)]
38struct ClientInner {
39 port: u16,
40 kernel: Arc<Kernel>,
41 client: Arc<dyn RPCClient>,
42}
43
44impl ClientInner {
45 fn new(port: u16, kernel: Arc<Kernel>, client: Arc<dyn RPCClient>) -> Self {
46 Self {
47 port,
48 kernel,
49 client,
50 }
51 }
52
53 #[inline]
54 fn is_loopback(&self, link: &ToUrl) -> bool {
55 link.host()
57 .map(|host| host.is_localhost() && host.port() == Some(self.port))
58 .unwrap_or(true)
59 }
60}
61
62#[async_trait]
63impl RPCClient for ClientInner {
64 async fn fetch(&self, txn_id: TxnId, link: ToUrl<'_>, actor_id: Value) -> TCResult<Actor> {
65 trace!(
66 "fetch actor {actor_id:?} at {link} (loopback: {})",
67 self.is_loopback(&link)
68 );
69
70 if self.is_loopback(&link) {
71 let public_key = self
72 .kernel
73 .public_key(txn_id, link.path())
74 .map_err(rjwt::Error::fetch)
75 .await?;
76
77 Ok(Actor::with_public_key(actor_id.clone(), public_key))
78 } else {
79 self.client.fetch(txn_id, link, actor_id).await
80 }
81 }
82
83 async fn get(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<State> {
84 if self.is_loopback(&link) {
85 let endpoint = self.kernel.route(link.path(), txn).await?;
86 let handler = endpoint.get(key)?;
87 handler.await
88 } else {
89 self.client.get(txn, link, key).await
90 }
91 }
92
93 async fn put(&self, txn: &Txn, link: ToUrl<'_>, key: Value, value: State) -> TCResult<()> {
94 if self.is_loopback(&link) {
95 let endpoint = self.kernel.route(link.path(), txn).await?;
96 let handler = endpoint.put(key, value)?;
97 handler.await
98 } else {
99 self.client.put(txn, link, key, value).await
100 }
101 }
102
103 async fn post(&self, txn: &Txn, link: ToUrl<'_>, params: Map<State>) -> TCResult<State> {
104 if self.is_loopback(&link) {
105 let endpoint = self.kernel.route(link.path(), txn).await?;
106 let handler = endpoint.post(params)?;
107 handler.await
108 } else {
109 self.client.post(txn, link, params).await
110 }
111 }
112
113 async fn delete(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<()> {
114 if self.is_loopback(&link) {
115 let endpoint = self.kernel.route(link.path(), txn).await?;
116 let handler = endpoint.delete(key)?;
117 handler.await
118 } else {
119 self.client.delete(txn, link, key).await
120 }
121 }
122}
123
124#[derive(Clone)]
127pub(crate) struct Client {
128 host: Host,
129 client: Arc<dyn RPCClient>,
130 egress: Option<Arc<dyn Egress>>,
131}
132
133impl Client {
134 pub fn new(host: Host, kernel: Arc<Kernel>, client: Arc<dyn RPCClient>) -> Self {
135 let client = ClientInner::new(host.port().expect("port"), kernel, client);
136
137 Self {
138 host,
139 client: Arc::new(client),
140 egress: None,
141 }
142 }
143
144 pub fn with_egress(self, egress: Arc<dyn Egress>) -> Self {
145 Self {
146 host: self.host,
147 client: self.client,
148 egress: Some(egress),
149 }
150 }
151
152 pub fn host(&self) -> &Host {
153 &self.host
154 }
155
156 #[inline]
157 fn authorize(&self, link: &ToUrl<'_>, write: bool) -> TCResult<()> {
158 if let Some(policy) = &self.egress {
159 if policy.is_authorized(&link, write) {
160 Ok(())
161 } else {
162 Err(unauthorized!(
163 "egress to {} (egress policy: {:?})",
164 link,
165 policy
166 ))
167 }
168 } else {
169 Ok(())
171 }
172 }
173}
174
175#[async_trait]
176impl RPCClient for Client {
177 async fn fetch(&self, txn_id: TxnId, link: ToUrl<'_>, actor_id: Value) -> TCResult<Actor> {
178 self.client.fetch(txn_id, link, actor_id).await
179 }
180
181 async fn get(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<State> {
182 self.authorize(&link, false)?;
183 self.client.get(txn, link, key).await
184 }
185
186 async fn put(&self, txn: &Txn, link: ToUrl<'_>, key: Value, value: State) -> TCResult<()> {
187 self.authorize(&link, true)?;
188 self.client.put(txn, link, key, value).await
189 }
190
191 async fn post(&self, txn: &Txn, link: ToUrl<'_>, params: Map<State>) -> TCResult<State> {
192 self.authorize(&link, true)?;
193 self.client.post(txn, link, params).await
194 }
195
196 async fn delete(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<()> {
197 self.authorize(&link, true)?;
198 self.client.delete(txn, link, key).await
199 }
200}