#include "shrpx_downstream_queue.h"
#include <cassert>
#include <limits>
#include "shrpx_downstream.h"
namespace shrpx {
DownstreamQueue::HostEntry::HostEntry(ImmutableString &&key)
: key(std::move(key)), num_active(0) {}
DownstreamQueue::DownstreamQueue(size_t conn_max_per_host, bool unified_host)
: conn_max_per_host_(conn_max_per_host == 0
? std::numeric_limits<size_t>::max()
: conn_max_per_host),
unified_host_(unified_host) {}
DownstreamQueue::~DownstreamQueue() {
dlist_delete_all(downstreams_);
for (auto &p : host_entries_) {
auto &ent = p.second;
dlist_delete_all(ent.blocked);
}
}
void DownstreamQueue::add_pending(std::unique_ptr<Downstream> downstream) {
downstream->set_dispatch_state(DispatchState::PENDING);
downstreams_.append(downstream.release());
}
void DownstreamQueue::mark_failure(Downstream *downstream) {
downstream->set_dispatch_state(DispatchState::FAILURE);
}
DownstreamQueue::HostEntry &
DownstreamQueue::find_host_entry(const std::string_view &host) {
auto itr = host_entries_.find(host);
if (itr == std::ranges::end(host_entries_)) {
auto key = ImmutableString{host};
auto key_ref = as_string_view(key);
std::tie(itr, std::ignore) =
host_entries_.emplace(key_ref, HostEntry(std::move(key)));
}
return (*itr).second;
}
std::string_view
DownstreamQueue::make_host_key(const std::string_view &host) const {
return unified_host_ ? ""sv : host;
}
std::string_view DownstreamQueue::make_host_key(Downstream *downstream) const {
return make_host_key(downstream->request().authority);
}
void DownstreamQueue::mark_active(Downstream *downstream) {
auto &ent = find_host_entry(make_host_key(downstream));
++ent.num_active;
downstream->set_dispatch_state(DispatchState::ACTIVE);
}
void DownstreamQueue::mark_blocked(Downstream *downstream) {
auto &ent = find_host_entry(make_host_key(downstream));
downstream->set_dispatch_state(DispatchState::BLOCKED);
auto link = new BlockedLink{};
downstream->attach_blocked_link(link);
ent.blocked.append(link);
}
bool DownstreamQueue::can_activate(const std::string_view &host) const {
auto itr = host_entries_.find(make_host_key(host));
if (itr == std::ranges::end(host_entries_)) {
return true;
}
auto &ent = (*itr).second;
return ent.num_active < conn_max_per_host_;
}
namespace {
bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent,
DownstreamQueue::HostEntryMap &host_entries,
const std::string_view &host) {
if (ent.blocked.empty() && ent.num_active == 0) {
host_entries.erase(host);
return true;
}
return false;
}
}
Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream,
bool next_blocked) {
auto delptr = std::unique_ptr<Downstream>(downstream);
downstreams_.remove(downstream);
auto host = make_host_key(downstream);
auto &ent = find_host_entry(host);
if (downstream->get_dispatch_state() == DispatchState::ACTIVE) {
--ent.num_active;
} else {
auto link = downstream->detach_blocked_link();
if (link) {
ent.blocked.remove(link);
delete link;
}
}
if (remove_host_entry_if_empty(ent, host_entries_, host)) {
return nullptr;
}
if (!next_blocked || ent.num_active >= conn_max_per_host_) {
return nullptr;
}
auto link = ent.blocked.head;
if (!link) {
return nullptr;
}
auto next_downstream = link->downstream;
auto link2 = next_downstream->detach_blocked_link();
(void)link2;
assert(link2 == link);
ent.blocked.remove(link);
delete link;
remove_host_entry_if_empty(ent, host_entries_, host);
return next_downstream;
}
Downstream *DownstreamQueue::get_downstreams() const {
return downstreams_.head;
}
}