#define PUBSUB_PRIVATE
#include "lib/dispatch/dispatch.h"
#include "lib/dispatch/dispatch_cfg.h"
#include "lib/dispatch/dispatch_naming.h"
#include "lib/dispatch/msgtypes.h"
#include "lib/pubsub/pubsub_flags.h"
#include "lib/pubsub/pub_binding_st.h"
#include "lib/pubsub/pubsub_build.h"
#include "lib/pubsub/pubsub_builder_st.h"
#include "lib/pubsub/pubsub_connect.h"
#include "lib/container/smartlist.h"
#include "lib/log/util_bug.h"
#include "lib/malloc/malloc.h"
#include <string.h>
static pubsub_items_t *
pubsub_items_new(void)
{
pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
cfg->items = smartlist_new();
cfg->type_items = smartlist_new();
return cfg;
}
void
pubsub_items_free_(pubsub_items_t *cfg)
{
if (! cfg)
return;
SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
SMARTLIST_FOREACH(cfg->type_items,
pubsub_type_cfg_t *, item, tor_free(item));
smartlist_free(cfg->items);
smartlist_free(cfg->type_items);
tor_free(cfg);
}
pubsub_builder_t *
pubsub_builder_new(void)
{
dispatch_naming_init();
pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
pb->cfg = dcfg_new();
pb->items = pubsub_items_new();
return pb;
}
void
pubsub_builder_free_(pubsub_builder_t *pb)
{
if (pb == NULL)
return;
pubsub_items_free(pb->items);
dcfg_free(pb->cfg);
tor_free(pb);
}
pubsub_connector_t *
pubsub_connector_for_subsystem(pubsub_builder_t *builder,
subsys_id_t subsys)
{
tor_assert(builder);
++builder->n_connectors;
pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
con->builder = builder;
con->subsys_id = subsys;
return con;
}
void
pubsub_connector_free_(pubsub_connector_t *con)
{
if (!con)
return;
if (con->builder) {
--con->builder->n_connectors;
tor_assert(con->builder->n_connectors >= 0);
}
tor_free(con);
}
int
pubsub_add_pub_(pubsub_connector_t *con,
pub_binding_t *out,
channel_id_t channel,
message_id_t msg,
msg_type_id_t type,
unsigned flags,
const char *file,
unsigned line)
{
pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
memset(out, 0, sizeof(*out));
cfg->is_publish = true;
out->msg_template.sender = cfg->subsys = con->subsys_id;
out->msg_template.channel = cfg->channel = channel;
out->msg_template.msg = cfg->msg = msg;
out->msg_template.type = cfg->type = type;
cfg->flags = flags;
cfg->added_by_file = file;
cfg->added_by_line = line;
cfg->pub_binding = out;
smartlist_add(con->builder->items->items, cfg);
if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
goto err;
if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
goto err;
return 0;
err:
++con->builder->n_errors;
return -1;
}
int
pubsub_add_sub_(pubsub_connector_t *con,
recv_fn_t recv_fn,
channel_id_t channel,
message_id_t msg,
msg_type_id_t type,
unsigned flags,
const char *file,
unsigned line)
{
pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
cfg->is_publish = false;
cfg->subsys = con->subsys_id;
cfg->channel = channel;
cfg->msg = msg;
cfg->type = type;
cfg->flags = flags;
cfg->added_by_file = file;
cfg->added_by_line = line;
cfg->recv_fn = recv_fn;
smartlist_add(con->builder->items->items, cfg);
if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
goto err;
if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
goto err;
if (! (flags & DISP_FLAG_STUB)) {
if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
goto err;
}
return 0;
err:
++con->builder->n_errors;
return -1;
}
int
pubsub_connector_register_type_(pubsub_connector_t *con,
msg_type_id_t type,
dispatch_typefns_t *fns,
const char *file,
unsigned line)
{
pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
cfg->type = type;
memcpy(&cfg->fns, fns, sizeof(*fns));
cfg->subsys = con->subsys_id;
cfg->added_by_file = file;
cfg->added_by_line = line;
smartlist_add(con->builder->items->type_items, cfg);
if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
goto err;
return 0;
err:
++con->builder->n_errors;
return -1;
}
static void
pubsub_items_install_bindings(pubsub_items_t *items,
dispatch_t *d)
{
SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
if (cfg->pub_binding) {
cfg->pub_binding->dispatch_ptr = d;
}
} SMARTLIST_FOREACH_END(cfg);
}
void
pubsub_items_clear_bindings(pubsub_items_t *items)
{
SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
if (cfg->pub_binding) {
cfg->pub_binding->dispatch_ptr = NULL;
}
} SMARTLIST_FOREACH_END(cfg);
}
dispatch_t *
pubsub_builder_finalize(pubsub_builder_t *builder,
pubsub_items_t **items_out)
{
dispatch_t *dispatcher = NULL;
tor_assert_nonfatal(builder->n_connectors == 0);
if (pubsub_builder_check(builder) < 0)
goto err;
if (builder->n_errors) {
log_warn(LD_GENERAL, "At least one error occurred previously when "
"configuring the dispatcher.");
goto err;
}
dispatcher = dispatch_new(builder->cfg);
if (!dispatcher)
goto err;
pubsub_items_install_bindings(builder->items, dispatcher);
if (items_out) {
*items_out = builder->items;
builder->items = NULL;
}
err:
pubsub_builder_free(builder);
return dispatcher;
}