geph5_client/
broker.rs

1mod fronted_http;
2mod priority_race;
3mod race;
4
5#[cfg(feature = "aws_lambda")]
6mod aws_lambda;
7
8use anyctx::AnyCtx;
9use anyhow::Context;
10
11#[cfg(feature = "aws_lambda")]
12use aws_lambda::AwsLambdaTransport;
13use fronted_http::FrontedHttpTransport;
14use geph5_broker_protocol::BrokerClient;
15use itertools::Itertools;
16use nanorpc::DynRpcTransport;
17use priority_race::PriorityRaceTransport;
18use race::RaceTransport;
19
20use serde::{Deserialize, Serialize};
21use sillad::tcp::TcpDialer;
22use std::{collections::BTreeMap, net::SocketAddr};
23
24use crate::client::{Config, CtxField};
25
26#[derive(Serialize, Deserialize, Clone)]
27#[serde(rename_all = "snake_case")]
28pub enum BrokerSource {
29    Direct(String),
30    Fronted {
31        front: String,
32        host: String,
33    },
34    DirectTcp(SocketAddr),
35    #[cfg(feature = "aws_lambda")]
36    AwsLambda {
37        function_name: String,
38        region: String,
39        access_key_id: String,
40        secret_access_key: String,
41    },
42    Race(Vec<BrokerSource>),
43    PriorityRace(BTreeMap<u64, BrokerSource>),
44}
45
46impl BrokerSource {
47    /// Converts to a RpcTransport.
48    pub fn rpc_transport(&self) -> DynRpcTransport {
49        match self {
50            BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
51                url: s.clone(),
52                host: None,
53            }),
54            BrokerSource::DirectTcp(dest_addr) => {
55                DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
56                    dest_addr: *dest_addr,
57                }))
58            }
59            BrokerSource::Fronted { front, host } => DynRpcTransport::new(FrontedHttpTransport {
60                url: front.clone(),
61                host: Some(host.clone()),
62            }),
63            #[cfg(feature = "aws_lambda")]
64            BrokerSource::AwsLambda {
65                function_name,
66                region,
67                access_key_id,
68                secret_access_key,
69            } => DynRpcTransport::new(AwsLambdaTransport {
70                function_name: function_name.clone(),
71                region: region.clone(),
72                access_key_id: access_key_id.clone(),
73                secret_access_key: secret_access_key.clone(),
74            }),
75            BrokerSource::Race(race_between) => {
76                let transports = race_between
77                    .iter()
78                    .map(|bs| bs.rpc_transport())
79                    .collect_vec();
80                DynRpcTransport::new(RaceTransport::new(transports))
81            }
82            BrokerSource::PriorityRace(inner) => {
83                let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
84                DynRpcTransport::new(PriorityRaceTransport::new(inner))
85            }
86        }
87    }
88}
89
90pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
91    ctx.get(BROKER_CLIENT).as_ref().context(
92        "broker information not provided, so cannot use any broker-dependent functionality",
93    )
94}
95
96static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
97    ctx.init()
98        .broker
99        .as_ref()
100        .map(|src| BrokerClient::from(src.rpc_transport()))
101};