1mod aws_lambda;
2mod fronted_http;
3mod race;
4
5use anyctx::AnyCtx;
6use anyhow::Context;
7
8use aws_lambda::AwsLambdaTransport;
9use fronted_http::FrontedHttpTransport;
10use geph5_broker_protocol::BrokerClient;
11use itertools::Itertools;
12use nanorpc::DynRpcTransport;
13use race::RaceTransport;
14
15use serde::{Deserialize, Serialize};
16use sillad::tcp::TcpDialer;
17use std::net::SocketAddr;
18
19use crate::client::{Config, CtxField};
20
21#[derive(Serialize, Deserialize, Clone)]
22#[serde(rename_all = "snake_case")]
23pub enum BrokerSource {
24 Direct(String),
25 Fronted {
26 front: String,
27 host: String,
28 },
29 DirectTcp(SocketAddr),
30 AwsLambda {
31 function_name: String,
32 region: String,
33 access_key_id: String,
34 secret_access_key: String,
35 },
36 Race(Vec<BrokerSource>),
37}
38
39impl BrokerSource {
40 pub fn rpc_transport(&self) -> DynRpcTransport {
42 match self {
43 BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
44 url: s.clone(),
45 host: None,
46 }),
47 BrokerSource::DirectTcp(dest_addr) => {
48 DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
49 dest_addr: *dest_addr,
50 }))
51 }
52 BrokerSource::Fronted { front, host } => DynRpcTransport::new(FrontedHttpTransport {
53 url: front.clone(),
54 host: Some(host.clone()),
55 }),
56 BrokerSource::AwsLambda {
57 function_name,
58 region,
59 access_key_id,
60 secret_access_key,
61 } => DynRpcTransport::new(AwsLambdaTransport {
62 function_name: function_name.clone(),
63 region: region.clone(),
64 access_key_id: access_key_id.clone(),
65 secret_access_key: secret_access_key.clone(),
66 }),
67 BrokerSource::Race(race_between) => {
68 let transports = race_between
69 .iter()
70 .map(|bs| bs.rpc_transport())
71 .collect_vec();
72 DynRpcTransport::new(RaceTransport::new(transports))
73 }
74 }
75 }
76}
77
78pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
79 ctx.get(BROKER_CLIENT).as_ref().context(
80 "broker information not provided, so cannot use any broker-dependent functionality",
81 )
82}
83
84static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
85 ctx.init()
86 .broker
87 .as_ref()
88 .map(|src| BrokerClient::from(src.rpc_transport()))
89};