#ifndef SHRPX_WORKER_H
#define SHRPX_WORKER_H
#include "shrpx.h"
#include <mutex>
#include <vector>
#include <random>
#include <unordered_map>
#include <deque>
#include <thread>
#include <queue>
#ifndef NOTHREADS
# include <future>
#endif
#include "ssl_compat.h"
#ifdef NGHTTP2_OPENSSL_IS_WOLFSSL
# include <wolfssl/options.h>
# include <wolfssl/openssl/ssl.h>
# include <wolfssl/openssl/err.h>
#else
# include <openssl/ssl.h>
# include <openssl/err.h>
#endif
#include <ev.h>
#include "shrpx_config.h"
#include "shrpx_downstream_connection_pool.h"
#include "memchunk.h"
#include "shrpx_tls.h"
#include "shrpx_live_check.h"
#include "shrpx_connect_blocker.h"
#include "shrpx_dns_tracker.h"
#ifdef ENABLE_HTTP3
# include "shrpx_quic_connection_handler.h"
# include "shrpx_quic.h"
#endif #include "allocator.h"
using namespace nghttp2;
namespace shrpx {
class Http2Session;
class ConnectBlocker;
struct UpstreamAddr;
class ConnectionHandler;
class AcceptHandler;
#ifdef ENABLE_HTTP3
class QUICListener;
#endif
#ifdef HAVE_MRUBY
namespace mruby {
class MRubyContext;
} #endif
namespace tls {
class CertLookupTree;
}
struct WeightGroup;
struct DownstreamAddr {
Address addr;
std::string_view host;
std::string_view hostport;
uint16_t port;
bool host_unix;
std::string_view sni;
std::unique_ptr<ConnectBlocker> connect_blocker;
std::unique_ptr<LiveCheck> live_check;
std::unique_ptr<DownstreamConnectionPool> dconn_pool;
size_t fall;
size_t rise;
tls::TLSSessionCache tls_session_cache;
DList<Http2Session> http2_extra_freelist;
WeightGroup *wg;
size_t num_dconn;
size_t seq;
Proto proto;
uint32_t cycle;
uint32_t pending_penalty;
uint32_t weight;
std::string_view group;
uint32_t group_weight;
uint32_t affinity_hash;
bool tls;
bool dns;
bool upgrade_scheme;
bool queued;
};
inline constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
struct DownstreamAddrEntry {
DownstreamAddr *addr;
size_t seq;
uint32_t cycle;
};
struct DownstreamAddrEntryGreater {
bool operator()(const DownstreamAddrEntry &lhs,
const DownstreamAddrEntry &rhs) const {
auto d = lhs.cycle - rhs.cycle;
if (d == 0) {
return rhs.seq < lhs.seq;
}
return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
}
};
struct WeightGroup {
std::priority_queue<DownstreamAddrEntry, std::vector<DownstreamAddrEntry>,
DownstreamAddrEntryGreater>
pq;
std::string_view name;
size_t seq;
uint32_t weight;
uint32_t cycle;
uint32_t pending_penalty;
bool queued;
};
struct WeightGroupEntry {
WeightGroup *wg;
size_t seq;
uint32_t cycle;
};
struct WeightGroupEntryGreater {
bool operator()(const WeightGroupEntry &lhs,
const WeightGroupEntry &rhs) const {
auto d = lhs.cycle - rhs.cycle;
if (d == 0) {
return rhs.seq < lhs.seq;
}
return d <= 2 * MAX_DOWNSTREAM_ADDR_WEIGHT - 1;
}
};
struct SharedDownstreamAddr {
SharedDownstreamAddr()
: balloc(1024, 1024),
affinity{SessionAffinity::NONE},
redirect_if_not_tls{false},
dnf{false},
timeout{} {}
SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
SharedDownstreamAddr(SharedDownstreamAddr &&) = delete;
SharedDownstreamAddr &operator=(const SharedDownstreamAddr &) = delete;
SharedDownstreamAddr &operator=(SharedDownstreamAddr &&) = delete;
BlockAllocator balloc;
std::vector<DownstreamAddr> addrs;
std::vector<WeightGroup> wgs;
std::priority_queue<WeightGroupEntry, std::vector<WeightGroupEntry>,
WeightGroupEntryGreater>
pq;
std::vector<AffinityHash> affinity_hash;
std::unordered_map<uint32_t, size_t> affinity_hash_map;
#ifdef HAVE_MRUBY
std::shared_ptr<mruby::MRubyContext> mruby_ctx;
#endif AffinityConfig affinity;
bool redirect_if_not_tls;
bool dnf;
struct {
ev_tstamp read;
ev_tstamp write;
} timeout;
};
struct DownstreamAddrGroup {
DownstreamAddrGroup();
~DownstreamAddrGroup();
DownstreamAddrGroup(const DownstreamAddrGroup &) = delete;
DownstreamAddrGroup(DownstreamAddrGroup &&) = delete;
DownstreamAddrGroup &operator=(const DownstreamAddrGroup &) = delete;
DownstreamAddrGroup &operator=(DownstreamAddrGroup &&) = delete;
ImmutableString pattern;
std::shared_ptr<SharedDownstreamAddr> shared_addr;
bool retired;
};
struct WorkerStat {
size_t num_connections;
size_t num_close_waits;
};
#ifdef ENABLE_HTTP3
struct QUICPacket {
QUICPacket(size_t upstream_addr_index, const Address &remote_addr,
const Address &local_addr, const ngtcp2_pkt_info &pi,
std::span<const uint8_t> data)
: upstream_addr_index{upstream_addr_index},
remote_addr{remote_addr},
local_addr{local_addr},
pi{pi},
data{std::ranges::begin(data), std::ranges::end(data)} {}
QUICPacket() : upstream_addr_index{}, remote_addr{}, local_addr{}, pi{} {}
size_t upstream_addr_index;
Address remote_addr;
Address local_addr;
ngtcp2_pkt_info pi;
std::vector<uint8_t> data;
};
#endif
enum class WorkerEventType {
REOPEN_LOG = 0x02,
GRACEFUL_SHUTDOWN = 0x03,
REPLACE_DOWNSTREAM = 0x04,
#ifdef ENABLE_HTTP3
QUIC_PKT_FORWARD = 0x05,
#endif };
struct WorkerEvent {
WorkerEventType type;
std::shared_ptr<DownstreamConfig> downstreamconf;
#ifdef ENABLE_HTTP3
std::unique_ptr<QUICPacket> quic_pkt;
#endif };
class Worker {
public:
Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
tls::CertLookupTree *cert_tree,
#ifdef ENABLE_HTTP3
SSL_CTX *quic_sv_ssl_ctx, tls::CertLookupTree *quic_cert_tree,
WorkerID wid,
#endif size_t index, const std::shared_ptr<TicketKeys> &ticket_keys,
ConnectionHandler *conn_handler,
std::shared_ptr<DownstreamConfig> downstreamconf);
~Worker();
void run_async();
void wait();
void process_events();
void send(WorkerEvent event);
tls::CertLookupTree *get_cert_lookup_tree() const;
#ifdef ENABLE_HTTP3
tls::CertLookupTree *get_quic_cert_lookup_tree() const;
#endif
std::shared_ptr<TicketKeys> get_ticket_keys();
void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
WorkerStat *get_worker_stat();
struct ev_loop *get_loop() const;
SSL_CTX *get_sv_ssl_ctx() const;
SSL_CTX *get_cl_ssl_ctx() const;
#ifdef ENABLE_HTTP3
SSL_CTX *get_quic_sv_ssl_ctx() const;
#endif
void set_graceful_shutdown(bool f);
bool get_graceful_shutdown() const;
MemchunkPool *get_mcpool();
void schedule_clear_mcpool();
std::mt19937 &get_randgen();
#ifdef HAVE_MRUBY
int create_mruby_context();
mruby::MRubyContext *get_mruby_context() const;
#endif
std::vector<std::shared_ptr<DownstreamAddrGroup>> &
get_downstream_addr_groups();
ConnectBlocker *get_connect_blocker() const;
const DownstreamConfig *get_downstream_config() const;
void
replace_downstream_config(std::shared_ptr<DownstreamConfig> downstreamconf);
ConnectionHandler *get_connection_handler() const;
int setup_server_socket();
void delete_listener();
void accept_pending_connection();
int create_tcp_server_socket(UpstreamAddr &addr);
void enable_listener();
void disable_listener();
void sleep_listener(ev_tstamp t);
#ifdef ENABLE_HTTP3
QUICConnectionHandler *get_quic_connection_handler();
int setup_quic_server_socket();
const WorkerID &get_worker_id() const;
# ifdef HAVE_LIBBPF
bool should_attach_bpf() const;
bool should_update_bpf_map() const;
uint32_t compute_sk_index() const;
# endif
int create_quic_server_socket(UpstreamAddr &addr);
const UpstreamAddr *find_quic_upstream_addr(const Address &local_addr);
#endif
DNSTracker *get_dns_tracker();
int handle_connection(int fd, sockaddr *addr, socklen_t addrlen,
const UpstreamAddr *faddr);
private:
#ifndef NOTHREADS
std::future<void> fut_;
#endif size_t index_;
std::mutex m_;
std::deque<WorkerEvent> q_;
std::mt19937 randgen_;
ev_async w_;
ev_timer mcpool_clear_timer_;
ev_timer proc_wev_timer_;
ev_timer disable_listener_timer_;
MemchunkPool mcpool_;
WorkerStat worker_stat_;
DNSTracker dns_tracker_;
std::vector<UpstreamAddr> upstream_addrs_;
std::vector<std::unique_ptr<AcceptHandler>> listeners_;
#ifdef ENABLE_HTTP3
WorkerID worker_id_;
std::vector<UpstreamAddr> quic_upstream_addrs_;
std::vector<std::unique_ptr<QUICListener>> quic_listeners_;
#endif
std::shared_ptr<DownstreamConfig> downstreamconf_;
#ifdef HAVE_MRUBY
std::unique_ptr<mruby::MRubyContext> mruby_ctx_;
#endif struct ev_loop *loop_;
SSL_CTX *sv_ssl_ctx_;
SSL_CTX *cl_ssl_ctx_;
tls::CertLookupTree *cert_tree_;
ConnectionHandler *conn_handler_;
#ifdef ENABLE_HTTP3
SSL_CTX *quic_sv_ssl_ctx_;
tls::CertLookupTree *quic_cert_tree_;
QUICConnectionHandler quic_conn_handler_;
#endif
#ifdef HAVE_ATOMIC_STD_SHARED_PTR
std::atomic<std::shared_ptr<TicketKeys>> ticket_keys_;
#else
std::mutex ticket_keys_m_;
std::shared_ptr<TicketKeys> ticket_keys_;
#endif std::vector<std::shared_ptr<DownstreamAddrGroup>> downstream_addr_groups_;
std::unique_ptr<ConnectBlocker> connect_blocker_;
bool graceful_shutdown_;
};
size_t match_downstream_addr_group(
const RouterConfig &routerconfig, const std::string_view &hostport,
const std::string_view &path,
const std::vector<std::shared_ptr<DownstreamAddrGroup>> &groups,
size_t catch_all, BlockAllocator &balloc);
void downstream_failure(DownstreamAddr *addr, const Address *raddr);
}
#endif