tc_server/
client.rs

1use std::fmt;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures::TryFutureExt;
6use log::trace;
7
8use tc_error::*;
9use tc_transact::TxnId;
10use tc_value::{Host, ToUrl, Value};
11use tcgeneric::Map;
12
13use crate::kernel::Kernel;
14use crate::{Actor, State, Txn};
15
16pub trait Egress: Send + Sync + fmt::Debug {
17    fn is_authorized(&self, link: &ToUrl<'_>, write: bool) -> bool;
18}
19
20#[async_trait]
21pub trait RPCClient: Send + Sync {
22    fn extract_jwt(&self, txn: &Txn) -> Option<String> {
23        txn.token().map(|token| token.jwt().to_string())
24    }
25
26    async fn fetch(&self, txn_id: TxnId, link: ToUrl<'_>, actor_id: Value) -> TCResult<Actor>;
27
28    async fn get(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<State>;
29
30    async fn put(&self, txn: &Txn, link: ToUrl<'_>, key: Value, value: State) -> TCResult<()>;
31
32    async fn post(&self, txn: &Txn, link: ToUrl<'_>, params: Map<State>) -> TCResult<State>;
33
34    async fn delete(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<()>;
35}
36
37#[derive(Clone)]
38struct ClientInner {
39    port: u16,
40    kernel: Arc<Kernel>,
41    client: Arc<dyn RPCClient>,
42}
43
44impl ClientInner {
45    fn new(port: u16, kernel: Arc<Kernel>, client: Arc<dyn RPCClient>) -> Self {
46        Self {
47            port,
48            kernel,
49            client,
50        }
51    }
52
53    #[inline]
54    fn is_loopback(&self, link: &ToUrl) -> bool {
55        // todo: check if the port matches the default port for the protocol (e.g. 80 for HTTP)
56        link.host()
57            .map(|host| host.is_localhost() && host.port() == Some(self.port))
58            .unwrap_or(true)
59    }
60}
61
62#[async_trait]
63impl RPCClient for ClientInner {
64    async fn fetch(&self, txn_id: TxnId, link: ToUrl<'_>, actor_id: Value) -> TCResult<Actor> {
65        trace!(
66            "fetch actor {actor_id:?} at {link} (loopback: {})",
67            self.is_loopback(&link)
68        );
69
70        if self.is_loopback(&link) {
71            let public_key = self
72                .kernel
73                .public_key(txn_id, link.path())
74                .map_err(rjwt::Error::fetch)
75                .await?;
76
77            Ok(Actor::with_public_key(actor_id.clone(), public_key))
78        } else {
79            self.client.fetch(txn_id, link, actor_id).await
80        }
81    }
82
83    async fn get(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<State> {
84        if self.is_loopback(&link) {
85            let endpoint = self.kernel.route(link.path(), txn).await?;
86            let handler = endpoint.get(key)?;
87            handler.await
88        } else {
89            self.client.get(txn, link, key).await
90        }
91    }
92
93    async fn put(&self, txn: &Txn, link: ToUrl<'_>, key: Value, value: State) -> TCResult<()> {
94        if self.is_loopback(&link) {
95            let endpoint = self.kernel.route(link.path(), txn).await?;
96            let handler = endpoint.put(key, value)?;
97            handler.await
98        } else {
99            self.client.put(txn, link, key, value).await
100        }
101    }
102
103    async fn post(&self, txn: &Txn, link: ToUrl<'_>, params: Map<State>) -> TCResult<State> {
104        if self.is_loopback(&link) {
105            let endpoint = self.kernel.route(link.path(), txn).await?;
106            let handler = endpoint.post(params)?;
107            handler.await
108        } else {
109            self.client.post(txn, link, params).await
110        }
111    }
112
113    async fn delete(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<()> {
114        if self.is_loopback(&link) {
115            let endpoint = self.kernel.route(link.path(), txn).await?;
116            let handler = endpoint.delete(key)?;
117            handler.await
118        } else {
119            self.client.delete(txn, link, key).await
120        }
121    }
122}
123
124// use an additional struct to provide indirection when calling RPCClient methods
125// in order to minimize build time
126#[derive(Clone)]
127pub(crate) struct Client {
128    host: Host,
129    client: Arc<dyn RPCClient>,
130    egress: Option<Arc<dyn Egress>>,
131}
132
133impl Client {
134    pub fn new(host: Host, kernel: Arc<Kernel>, client: Arc<dyn RPCClient>) -> Self {
135        let client = ClientInner::new(host.port().expect("port"), kernel, client);
136
137        Self {
138            host,
139            client: Arc::new(client),
140            egress: None,
141        }
142    }
143
144    pub fn with_egress(self, egress: Arc<dyn Egress>) -> Self {
145        Self {
146            host: self.host,
147            client: self.client,
148            egress: Some(egress),
149        }
150    }
151
152    pub fn host(&self) -> &Host {
153        &self.host
154    }
155
156    #[inline]
157    fn authorize(&self, link: &ToUrl<'_>, write: bool) -> TCResult<()> {
158        if let Some(policy) = &self.egress {
159            if policy.is_authorized(&link, write) {
160                Ok(())
161            } else {
162                Err(unauthorized!(
163                    "egress to {} (egress policy: {:?})",
164                    link,
165                    policy
166                ))
167            }
168        } else {
169            // TODO: enforce egress whitelist
170            Ok(())
171        }
172    }
173}
174
175#[async_trait]
176impl RPCClient for Client {
177    async fn fetch(&self, txn_id: TxnId, link: ToUrl<'_>, actor_id: Value) -> TCResult<Actor> {
178        self.client.fetch(txn_id, link, actor_id).await
179    }
180
181    async fn get(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<State> {
182        self.authorize(&link, false)?;
183        self.client.get(txn, link, key).await
184    }
185
186    async fn put(&self, txn: &Txn, link: ToUrl<'_>, key: Value, value: State) -> TCResult<()> {
187        self.authorize(&link, true)?;
188        self.client.put(txn, link, key, value).await
189    }
190
191    async fn post(&self, txn: &Txn, link: ToUrl<'_>, params: Map<State>) -> TCResult<State> {
192        self.authorize(&link, true)?;
193        self.client.post(txn, link, params).await
194    }
195
196    async fn delete(&self, txn: &Txn, link: ToUrl<'_>, key: Value) -> TCResult<()> {
197        self.authorize(&link, true)?;
198        self.client.delete(txn, link, key).await
199    }
200}