1mod aws_lambda;
2mod fronted_http;
3mod race;
4
5use anyctx::AnyCtx;
6use anyhow::Context;
7
8use aws_lambda::AwsLambdaTransport;
9use fronted_http::FrontedHttpTransport;
10use geph5_broker_protocol::BrokerClient;
11use itertools::Itertools;
12use nanorpc::DynRpcTransport;
13use race::RaceTransport;
14use reqwest::Client;
15use serde::{Deserialize, Serialize};
16use sillad::tcp::TcpDialer;
17use std::net::SocketAddr;
18
19use crate::client::{Config, CtxField};
20
21#[derive(Serialize, Deserialize, Clone)]
22#[serde(rename_all = "snake_case")]
23pub enum BrokerSource {
24 Direct(String),
25 Fronted {
26 front: String,
27 host: String,
28 },
29 DirectTcp(SocketAddr),
30 AwsLambda {
31 function_name: String,
32 region: String,
33 access_key_id: String,
34 secret_access_key: String,
35 },
36 Race(Vec<BrokerSource>),
37}
38
39impl BrokerSource {
40 pub fn rpc_transport(&self) -> DynRpcTransport {
42 let client = Client::builder().no_proxy().build().unwrap();
43 match self {
44 BrokerSource::Direct(s) => DynRpcTransport::new(FrontedHttpTransport {
45 url: s.clone(),
46 host: None,
47 client,
48 }),
49 BrokerSource::DirectTcp(dest_addr) => {
50 DynRpcTransport::new(nanorpc_sillad::DialerTransport(TcpDialer {
51 dest_addr: *dest_addr,
52 }))
53 }
54 BrokerSource::Fronted { front, host } => DynRpcTransport::new(FrontedHttpTransport {
55 url: front.clone(),
56 host: Some(host.clone()),
57 client,
58 }),
59 BrokerSource::AwsLambda {
60 function_name,
61 region,
62 access_key_id,
63 secret_access_key,
64 } => DynRpcTransport::new(AwsLambdaTransport {
65 function_name: function_name.clone(),
66 region: region.clone(),
67 access_key_id: access_key_id.clone(),
68 secret_access_key: secret_access_key.clone(),
69 }),
70 BrokerSource::Race(race_between) => {
71 let transports = race_between
72 .iter()
73 .map(|bs| bs.rpc_transport())
74 .collect_vec();
75 DynRpcTransport::new(RaceTransport::new(transports))
76 }
77 }
78 }
79}
80
81pub fn broker_client(ctx: &AnyCtx<Config>) -> anyhow::Result<&BrokerClient> {
82 ctx.get(BROKER_CLIENT).as_ref().context(
83 "broker information not provided, so cannot use any broker-dependent functionality",
84 )
85}
86
87static BROKER_CLIENT: CtxField<Option<BrokerClient>> = |ctx| {
88 ctx.init()
89 .broker
90 .as_ref()
91 .map(|src| BrokerClient::from(src.rpc_transport()))
92};