geph5_client/
broker.rs

1mod aws_lambda;
2mod fronted_http;
3mod race;
4
5use anyctx::AnyCtx;
6use anyhow::Context;
7
8use aws_lambda::AwsLambdaTransport;
9use fronted_http::FrontedHttpTransport;
10use geph5_broker_protocol::BrokerClient;
11use itertools::Itertools;
12use nanorpc::DynRpcTransport;
13use race::RaceTransport;
14
15use serde::{Deserialize, Serialize};
16use sillad::tcp::TcpDialer;
17use std::net::SocketAddr;
18
19use crate::client::{Config, CtxField};
20
21#[derive(Serialize, Deserialize, Clone)]
22#[serde(rename_all = "snake_case")]
23pub enum BrokerSource {
24    Direct(String),
25    Fronted {
26        front: String,
27        host: String,
28    },
29    DirectTcp(SocketAddr),
30    AwsLambda {
31        function_name: String,
32        region: String,
33        access_key_id: String,
34        secret_access_key: String,
35    },
36    Race(Vec<BrokerSource>),
37}
38
39impl BrokerSource {
40    /// Converts to a RpcTransport.
41    pub fn rpc_transport(&self) -> DynRpcTransport {
42        match self {
43            BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
44                url: s.clone(),
45                host: None,
46            }),
47            BrokerSource::DirectTcp(dest_addr) => {
48                DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
49                    dest_addr: *dest_addr,
50                }))
51            }
52            BrokerSource::Fronted { front, host } => DynRpcTransport::new(FrontedHttpTransport {
53                url: front.clone(),
54                host: Some(host.clone()),
55            }),
56            BrokerSource::AwsLambda {
57                function_name,
58                region,
59                access_key_id,
60                secret_access_key,
61            } => DynRpcTransport::new(AwsLambdaTransport {
62                function_name: function_name.clone(),
63                region: region.clone(),
64                access_key_id: access_key_id.clone(),
65                secret_access_key: secret_access_key.clone(),
66            }),
67            BrokerSource::Race(race_between) => {
68                let transports = race_between
69                    .iter()
70                    .map(|bs| bs.rpc_transport())
71                    .collect_vec();
72                DynRpcTransport::new(RaceTransport::new(transports))
73            }
74        }
75    }
76}
77
78pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
79    ctx.get(BROKER_CLIENT).as_ref().context(
80        "broker information not provided, so cannot use any broker-dependent functionality",
81    )
82}
83
84static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
85    ctx.init()
86        .broker
87        .as_ref()
88        .map(|src| BrokerClient::from(src.rpc_transport()))
89};