geph5_client/
broker.rs

1mod fronted_http;
2mod race;
3
4#[cfg(feature = "aws_lambda")]
5mod aws_lambda;
6
7use anyctx::AnyCtx;
8use anyhow::Context;
9
10#[cfg(feature = "aws_lambda")]
11use aws_lambda::AwsLambdaTransport;
12use fronted_http::FrontedHttpTransport;
13use geph5_broker_protocol::BrokerClient;
14use itertools::Itertools;
15use nanorpc::DynRpcTransport;
16use race::RaceTransport;
17
18use serde::{Deserialize, Serialize};
19use sillad::tcp::TcpDialer;
20use std::net::SocketAddr;
21
22use crate::client::{Config, CtxField};
23
24#[derive(Serialize, Deserialize, Clone)]
25#[serde(rename_all = "snake_case")]
26pub enum BrokerSource {
27    Direct(String),
28    Fronted {
29        front: String,
30        host: String,
31    },
32    DirectTcp(SocketAddr),
33    #[cfg(feature = "aws_lambda")]
34    AwsLambda {
35        function_name: String,
36        region: String,
37        access_key_id: String,
38        secret_access_key: String,
39    },
40    Race(Vec<BrokerSource>),
41}
42
43impl BrokerSource {
44    /// Converts to a RpcTransport.
45    pub fn rpc_transport(&self) -> DynRpcTransport {
46        match self {
47            BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
48                url: s.clone(),
49                host: None,
50            }),
51            BrokerSource::DirectTcp(dest_addr) => {
52                DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
53                    dest_addr: *dest_addr,
54                }))
55            }
56            BrokerSource::Fronted { front, host } => DynRpcTransport::new(FrontedHttpTransport {
57                url: front.clone(),
58                host: Some(host.clone()),
59            }),
60            #[cfg(feature = "aws_lambda")]
61            BrokerSource::AwsLambda {
62                function_name,
63                region,
64                access_key_id,
65                secret_access_key,
66            } => DynRpcTransport::new(AwsLambdaTransport {
67                function_name: function_name.clone(),
68                region: region.clone(),
69                access_key_id: access_key_id.clone(),
70                secret_access_key: secret_access_key.clone(),
71            }),
72            BrokerSource::Race(race_between) => {
73                let transports = race_between
74                    .iter()
75                    .map(|bs| bs.rpc_transport())
76                    .collect_vec();
77                DynRpcTransport::new(RaceTransport::new(transports))
78            }
79        }
80    }
81}
82
83pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
84    ctx.get(BROKER_CLIENT).as_ref().context(
85        "broker information not provided, so cannot use any broker-dependent functionality",
86    )
87}
88
89static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
90    ctx.init()
91        .broker
92        .as_ref()
93        .map(|src| BrokerClient::from(src.rpc_transport()))
94};