1mod fronted_http;
2mod race;
3
4#[cfg(feature = "aws_lambda")]
5mod aws_lambda;
6
7use anyctx::AnyCtx;
8use anyhow::Context;
9
10#[cfg(feature = "aws_lambda")]
11use aws_lambda::AwsLambdaTransport;
12use fronted_http::FrontedHttpTransport;
13use geph5_broker_protocol::BrokerClient;
14use itertools::Itertools;
15use nanorpc::DynRpcTransport;
16use race::RaceTransport;
17
18use serde::{Deserialize, Serialize};
19use sillad::tcp::TcpDialer;
20use std::net::SocketAddr;
21
22use crate::client::{Config, CtxField};
23
24#[derive(Serialize, Deserialize, Clone)]
25#[serde(rename_all = "snake_case")]
26pub enum BrokerSource {
27 Direct(String),
28 Fronted {
29 front: String,
30 host: String,
31 },
32 DirectTcp(SocketAddr),
33 #[cfg(feature = "aws_lambda")]
34 AwsLambda {
35 function_name: String,
36 region: String,
37 access_key_id: String,
38 secret_access_key: String,
39 },
40 Race(Vec<BrokerSource>),
41}
42
43impl BrokerSource {
44 pub fn rpc_transport(&self) -> DynRpcTransport {
46 match self {
47 BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
48 url: s.clone(),
49 host: None,
50 }),
51 BrokerSource::DirectTcp(dest_addr) => {
52 DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
53 dest_addr: *dest_addr,
54 }))
55 }
56 BrokerSource::Fronted { front, host } => DynRpcTransport::new(FrontedHttpTransport {
57 url: front.clone(),
58 host: Some(host.clone()),
59 }),
60 #[cfg(feature = "aws_lambda")]
61 BrokerSource::AwsLambda {
62 function_name,
63 region,
64 access_key_id,
65 secret_access_key,
66 } => DynRpcTransport::new(AwsLambdaTransport {
67 function_name: function_name.clone(),
68 region: region.clone(),
69 access_key_id: access_key_id.clone(),
70 secret_access_key: secret_access_key.clone(),
71 }),
72 BrokerSource::Race(race_between) => {
73 let transports = race_between
74 .iter()
75 .map(|bs| bs.rpc_transport())
76 .collect_vec();
77 DynRpcTransport::new(RaceTransport::new(transports))
78 }
79 }
80 }
81}
82
83pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
84 ctx.get(BROKER_CLIENT).as_ref().context(
85 "broker information not provided, so cannot use any broker-dependent functionality",
86 )
87}
88
89static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
90 ctx.init()
91 .broker
92 .as_ref()
93 .map(|src| BrokerClient::from(src.rpc_transport()))
94};