geph5_client/
broker.rs

1mod aws_lambda;
2mod fronted_http;
3mod race;
4
5use anyctx::AnyCtx;
6use anyhow::Context;
7
8use aws_lambda::AwsLambdaTransport;
9use fronted_http::FrontedHttpTransport;
10use geph5_broker_protocol::BrokerClient;
11use itertools::Itertools;
12use nanorpc::DynRpcTransport;
13use race::RaceTransport;
14use reqwest::Client;
15use serde::{Deserialize, Serialize};
16use sillad::tcp::TcpDialer;
17use std::net::SocketAddr;
18
19use crate::client::{Config, CtxField};
20
21#[derive(Serialize, Deserialize, Clone)]
22#[serde(rename_all = "snake_case")]
23pub enum BrokerSource {
24    Direct(String),
25    Fronted {
26        front: String,
27        host: String,
28    },
29    DirectTcp(SocketAddr),
30    AwsLambda {
31        function_name: String,
32        region: String,
33        access_key_id: String,
34        secret_access_key: String,
35    },
36    Race(Vec<BrokerSource>),
37}
38
39impl BrokerSource {
40    /// Converts to a RpcTransport.
41    pub fn rpc_transport(&self) -> DynRpcTransport {
42        let client = Client::builder().no_proxy().build().unwrap();
43        match self {
44            BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
45                url: s.clone(),
46                host: None,
47                client,
48            }),
49            BrokerSource::DirectTcp(dest_addr) => {
50                DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
51                    dest_addr: *dest_addr,
52                }))
53            }
54            BrokerSource::Fronted { front, host } => DynRpcTransport::new(FrontedHttpTransport {
55                url: front.clone(),
56                host: Some(host.clone()),
57                client,
58            }),
59            BrokerSource::AwsLambda {
60                function_name,
61                region,
62                access_key_id,
63                secret_access_key,
64            } => DynRpcTransport::new(AwsLambdaTransport {
65                function_name: function_name.clone(),
66                region: region.clone(),
67                access_key_id: access_key_id.clone(),
68                secret_access_key: secret_access_key.clone(),
69            }),
70            BrokerSource::Race(race_between) => {
71                let transports = race_between
72                    .iter()
73                    .map(|bs| bs.rpc_transport())
74                    .collect_vec();
75                DynRpcTransport::new(RaceTransport::new(transports))
76            }
77        }
78    }
79}
80
81pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
82    ctx.get(BROKER_CLIENT).as_ref().context(
83        "broker information not provided, so cannot use any broker-dependent functionality",
84    )
85}
86
87static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
88    ctx.init()
89        .broker
90        .as_ref()
91        .map(|src| BrokerClient::from(src.rpc_transport()))
92};