1mod fronted_http;
2mod priority_race;
3mod race;
4
5#[cfg(feature = "aws_lambda")]
6mod aws_lambda;
7
8use anyctx::AnyCtx;
9use anyhow::Context;
10
11#[cfg(feature = "aws_lambda")]
12use aws_lambda::AwsLambdaTransport;
13use fronted_http::FrontedHttpTransport;
14use geph5_broker_protocol::BrokerClient;
15use itertools::Itertools;
16use nanorpc::DynRpcTransport;
17use priority_race::PriorityRaceTransport;
18use race::RaceTransport;
19
20use serde::{Deserialize, Serialize};
21use sillad::tcp::TcpDialer;
22use std::{collections::BTreeMap, net::SocketAddr};
23
24use crate::client::{Config, CtxField};
25
26#[derive(Serialize, Deserialize, Clone)]
27#[serde(rename_all = "snake_case")]
28pub enum BrokerSource {
29 Direct(String),
30 Fronted {
31 front: String,
32 host: String,
33 },
34 DirectTcp(SocketAddr),
35 #[cfg(feature = "aws_lambda")]
36 AwsLambda {
37 function_name: String,
38 region: String,
39 access_key_id: String,
40 secret_access_key: String,
41 },
42 Race(Vec<BrokerSource>),
43 PriorityRace(BTreeMap<u64, BrokerSource>),
44}
45
46impl BrokerSource {
47 pub fn rpc_transport(&self) -> DynRpcTransport {
49 match self {
50 BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
51 url: s.clone(),
52 host: None,
53 }),
54 BrokerSource::DirectTcp(dest_addr) => {
55 DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
56 dest_addr: *dest_addr,
57 }))
58 }
59 BrokerSource::Fronted { front, host } => DynRpcTransport::new(FrontedHttpTransport {
60 url: front.clone(),
61 host: Some(host.clone()),
62 }),
63 #[cfg(feature = "aws_lambda")]
64 BrokerSource::AwsLambda {
65 function_name,
66 region,
67 access_key_id,
68 secret_access_key,
69 } => DynRpcTransport::new(AwsLambdaTransport {
70 function_name: function_name.clone(),
71 region: region.clone(),
72 access_key_id: access_key_id.clone(),
73 secret_access_key: secret_access_key.clone(),
74 }),
75 BrokerSource::Race(race_between) => {
76 let transports = race_between
77 .iter()
78 .map(|bs| bs.rpc_transport())
79 .collect_vec();
80 DynRpcTransport::new(RaceTransport::new(transports))
81 }
82 BrokerSource::PriorityRace(inner) => {
83 let inner = inner.iter().map(|(k, v)| (*k, v.rpc_transport())).collect();
84 DynRpcTransport::new(PriorityRaceTransport::new(inner))
85 }
86 }
87 }
88}
89
90pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
91 ctx.get(BROKER_CLIENT).as_ref().context(
92 "broker information not provided, so cannot use any broker-dependent functionality",
93 )
94}
95
96static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
97 ctx.init()
98 .broker
99 .as_ref()
100 .map(|src| BrokerClient::from(src.rpc_transport()))
101};