#include <srs_app_ingest.hpp>
#ifdef SRS_AUTO_INGEST
#include <stdlib.h>
using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_app_config.hpp>
#include <srs_kernel_log.hpp>
#include <srs_app_ffmpeg.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_utility.hpp>
#define SRS_AUTO_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL)
SrsIngesterFFMPEG::SrsIngesterFFMPEG()
{
ffmpeg = NULL;
}
SrsIngesterFFMPEG::~SrsIngesterFFMPEG()
{
srs_freep(ffmpeg);
}
int SrsIngesterFFMPEG::initialize(SrsFFMPEG* ff, string v, string i)
{
int ret = ERROR_SUCCESS;
ffmpeg = ff;
vhost = v;
id = i;
starttime = srs_get_system_time_ms();
return ret;
}
string SrsIngesterFFMPEG::uri()
{
return vhost + "/" + id;
}
int SrsIngesterFFMPEG::alive()
{
return (int)(srs_get_system_time_ms() - starttime);
}
bool SrsIngesterFFMPEG::equals(string v)
{
return vhost == v;
}
bool SrsIngesterFFMPEG::equals(string v, string i)
{
return vhost == v && id == i;
}
int SrsIngesterFFMPEG::start()
{
return ffmpeg->start();
}
void SrsIngesterFFMPEG::stop()
{
ffmpeg->stop();
}
int SrsIngesterFFMPEG::cycle()
{
return ffmpeg->cycle();
}
void SrsIngesterFFMPEG::fast_stop()
{
ffmpeg->fast_stop();
}
SrsIngester::SrsIngester()
{
_srs_config->subscribe(this);
pthread = new SrsReusableThread("ingest", this, SRS_AUTO_INGESTER_SLEEP_US);
pprint = SrsPithyPrint::create_ingester();
}
SrsIngester::~SrsIngester()
{
_srs_config->unsubscribe(this);
srs_freep(pthread);
clear_engines();
}
int SrsIngester::start()
{
int ret = ERROR_SUCCESS;
if ((ret = parse()) != ERROR_SUCCESS) {
clear_engines();
ret = ERROR_SUCCESS;
return ret;
}
if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
srs_trace("ingest thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());
return ret;
}
int SrsIngester::parse_ingesters(SrsConfDirective* vhost)
{
int ret = ERROR_SUCCESS;
std::vector<SrsConfDirective*> ingesters = _srs_config->get_ingesters(vhost->arg0());
for (int i = 0; i < (int)ingesters.size(); i++) {
SrsConfDirective* ingest = ingesters[i];
if ((ret = parse_engines(vhost, ingest)) != ERROR_SUCCESS) {
return ret;
}
}
return ret;
}
int SrsIngester::parse_engines(SrsConfDirective* vhost, SrsConfDirective* ingest)
{
int ret = ERROR_SUCCESS;
if (!_srs_config->get_ingest_enabled(ingest)) {
return ret;
}
std::string ffmpeg_bin = _srs_config->get_ingest_ffmpeg(ingest);
if (ffmpeg_bin.empty()) {
ret = ERROR_ENCODER_PARSE;
srs_trace("empty ffmpeg ret=%d", ret);
return ret;
}
std::vector<SrsConfDirective*> engines = _srs_config->get_transcode_engines(ingest);
if (engines.empty()) {
SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, NULL)) != ERROR_SUCCESS) {
srs_freep(ffmpeg);
if (ret != ERROR_ENCODER_LOOP) {
srs_error("invalid ingest engine. ret=%d", ret);
}
return ret;
}
SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();
if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {
srs_freep(ingester);
return ret;
}
ingesters.push_back(ingester);
return ret;
}
for (int i = 0; i < (int)engines.size(); i++) {
SrsConfDirective* engine = engines[i];
SrsFFMPEG* ffmpeg = new SrsFFMPEG(ffmpeg_bin);
if ((ret = initialize_ffmpeg(ffmpeg, vhost, ingest, engine)) != ERROR_SUCCESS) {
srs_freep(ffmpeg);
if (ret != ERROR_ENCODER_LOOP) {
srs_error("invalid ingest engine: %s %s, ret=%d",
ingest->arg0().c_str(), engine->arg0().c_str(), ret);
}
return ret;
}
SrsIngesterFFMPEG* ingester = new SrsIngesterFFMPEG();
if ((ret = ingester->initialize(ffmpeg, vhost->arg0(), ingest->arg0())) != ERROR_SUCCESS) {
srs_freep(ingester);
return ret;
}
ingesters.push_back(ingester);
}
return ret;
}
void SrsIngester::dispose()
{
std::vector<SrsIngesterFFMPEG*>::iterator it;
for (it = ingesters.begin(); it != ingesters.end(); ++it) {
SrsIngesterFFMPEG* ingester = *it;
ingester->fast_stop();
}
if (!ingesters.empty()) {
srs_trace("fast stop all ingesters ok.");
}
stop();
}
void SrsIngester::stop()
{
pthread->stop();
clear_engines();
}
int SrsIngester::cycle()
{
int ret = ERROR_SUCCESS;
std::vector<SrsIngesterFFMPEG*>::iterator it;
for (it = ingesters.begin(); it != ingesters.end(); ++it) {
SrsIngesterFFMPEG* ingester = *it;
if ((ret = ingester->start()) != ERROR_SUCCESS) {
srs_error("ingest ffmpeg start failed. ret=%d", ret);
return ret;
}
if ((ret = ingester->cycle()) != ERROR_SUCCESS) {
srs_error("ingest ffmpeg cycle failed. ret=%d", ret);
return ret;
}
}
show_ingest_log_message();
return ret;
}
void SrsIngester::on_thread_stop()
{
}
void SrsIngester::clear_engines()
{
std::vector<SrsIngesterFFMPEG*>::iterator it;
for (it = ingesters.begin(); it != ingesters.end(); ++it) {
SrsIngesterFFMPEG* ingester = *it;
srs_freep(ingester);
}
ingesters.clear();
}
int SrsIngester::parse()
{
int ret = ERROR_SUCCESS;
std::vector<SrsConfDirective*> vhosts;
_srs_config->get_vhosts(vhosts);
for (int i = 0; i < (int)vhosts.size(); i++) {
SrsConfDirective* vhost = vhosts[i];
if ((ret = parse_ingesters(vhost)) != ERROR_SUCCESS) {
return ret;
}
}
return ret;
}
int SrsIngester::initialize_ffmpeg(SrsFFMPEG* ffmpeg, SrsConfDirective* vhost, SrsConfDirective* ingest, SrsConfDirective* engine)
{
int ret = ERROR_SUCCESS;
std::string port;
if (true) {
std::vector<std::string> ip_ports = _srs_config->get_listens();
srs_assert(ip_ports.size() > 0);
std::string ep = ip_ports[0];
std::string ip;
srs_parse_endpoint(ep, ip, port);
}
std::string output = _srs_config->get_engine_output(engine);
output = srs_string_replace(output, "[vhost]", vhost->arg0());
output = srs_string_replace(output, "[port]", port);
if (output.empty()) {
ret = ERROR_ENCODER_NO_OUTPUT;
srs_trace("empty output url, ingest=%s. ret=%d", ingest->arg0().c_str(), ret);
return ret;
}
std::string url = output;
std::string app, stream;
size_t pos = std::string::npos;
if ((pos = url.rfind("/")) != std::string::npos) {
stream = url.substr(pos + 1);
url = url.substr(0, pos);
}
if ((pos = url.rfind("/")) != std::string::npos) {
app = url.substr(pos + 1);
url = url.substr(0, pos);
}
if ((pos = app.rfind("?")) != std::string::npos) {
app = app.substr(0, pos);
}
std::string log_file = SRS_CONSTS_NULL_FILE; if (_srs_config->get_ffmpeg_log_enabled()) {
log_file = _srs_config->get_ffmpeg_log_dir();
log_file += "/";
log_file += "ffmpeg-ingest";
log_file += "-";
log_file += vhost->arg0();
log_file += "-";
log_file += app;
log_file += "-";
log_file += stream;
log_file += ".log";
}
std::string input_type = _srs_config->get_ingest_input_type(ingest);
if (input_type.empty()) {
ret = ERROR_ENCODER_NO_INPUT;
srs_trace("empty intput type, ingest=%s. ret=%d", ingest->arg0().c_str(), ret);
return ret;
}
if (srs_config_ingest_is_file(input_type)) {
std::string input_url = _srs_config->get_ingest_input_url(ingest);
if (input_url.empty()) {
ret = ERROR_ENCODER_NO_INPUT;
srs_trace("empty intput url, ingest=%s. ret=%d", ingest->arg0().c_str(), ret);
return ret;
}
ffmpeg->set_iparams("-re");
if ((ret = ffmpeg->initialize(input_url, output, log_file)) != ERROR_SUCCESS) {
return ret;
}
} else if (srs_config_ingest_is_stream(input_type)) {
std::string input_url = _srs_config->get_ingest_input_url(ingest);
if (input_url.empty()) {
ret = ERROR_ENCODER_NO_INPUT;
srs_trace("empty intput url, ingest=%s. ret=%d", ingest->arg0().c_str(), ret);
return ret;
}
ffmpeg->set_iparams("");
if ((ret = ffmpeg->initialize(input_url, output, log_file)) != ERROR_SUCCESS) {
return ret;
}
} else {
ret = ERROR_ENCODER_INPUT_TYPE;
srs_error("invalid ingest=%s type=%s, ret=%d",
ingest->arg0().c_str(), input_type.c_str(), ret);
}
ffmpeg->set_oformat("flv");
std::string vcodec = _srs_config->get_engine_vcodec(engine);
std::string acodec = _srs_config->get_engine_acodec(engine);
bool engine_disabled = !engine || !_srs_config->get_engine_enabled(engine);
if (engine_disabled || vcodec.empty() || acodec.empty()) {
if ((ret = ffmpeg->initialize_copy()) != ERROR_SUCCESS) {
return ret;
}
} else {
if ((ret = ffmpeg->initialize_transcode(engine)) != ERROR_SUCCESS) {
return ret;
}
}
srs_trace("parse success, ingest=%s, vhost=%s",
ingest->arg0().c_str(), vhost->arg0().c_str());
return ret;
}
void SrsIngester::show_ingest_log_message()
{
pprint->elapse();
if ((int)ingesters.size() <= 0) {
return;
}
int index = rand() % (int)ingesters.size();
SrsIngesterFFMPEG* ingester = ingesters.at(index);
if (pprint->can_print()) {
srs_trace("-> "SRS_CONSTS_LOG_INGESTER" time=%"PRId64", ingesters=%d, #%d(alive=%ds, %s)",
pprint->age(), (int)ingesters.size(), index, ingester->alive() / 1000, ingester->uri().c_str());
}
}
int SrsIngester::on_reload_vhost_added(string vhost)
{
int ret = ERROR_SUCCESS;
SrsConfDirective* _vhost = _srs_config->get_vhost(vhost);
if ((ret = parse_ingesters(_vhost)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("reload add vhost ingesters, vhost=%s", vhost.c_str());
return ret;
}
int SrsIngester::on_reload_vhost_removed(string vhost)
{
int ret = ERROR_SUCCESS;
std::vector<SrsIngesterFFMPEG*>::iterator it;
for (it = ingesters.begin(); it != ingesters.end();) {
SrsIngesterFFMPEG* ingester = *it;
if (!ingester->equals(vhost)) {
++it;
continue;
}
ingester->stop();
srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str());
srs_freep(ingester);
it = ingesters.erase(it);
}
return ret;
}
int SrsIngester::on_reload_ingest_removed(string vhost, string ingest_id)
{
int ret = ERROR_SUCCESS;
std::vector<SrsIngesterFFMPEG*>::iterator it;
for (it = ingesters.begin(); it != ingesters.end();) {
SrsIngesterFFMPEG* ingester = *it;
if (!ingester->equals(vhost, ingest_id)) {
++it;
continue;
}
ingester->stop();
srs_trace("reload stop ingester, vhost=%s, id=%s", vhost.c_str(), ingester->uri().c_str());
srs_freep(ingester);
it = ingesters.erase(it);
}
return ret;
}
int SrsIngester::on_reload_ingest_added(string vhost, string ingest_id)
{
int ret = ERROR_SUCCESS;
SrsConfDirective* _vhost = _srs_config->get_vhost(vhost);
SrsConfDirective* _ingester = _srs_config->get_ingest_by_id(vhost, ingest_id);
if ((ret = parse_engines(_vhost, _ingester)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("reload add ingester, "
"vhost=%s, id=%s", vhost.c_str(), ingest_id.c_str());
return ret;
}
int SrsIngester::on_reload_ingest_updated(string vhost, string ingest_id)
{
int ret = ERROR_SUCCESS;
if ((ret = on_reload_ingest_removed(vhost, ingest_id)) != ERROR_SUCCESS) {
return ret;
}
if ((ret = on_reload_ingest_added(vhost, ingest_id)) != ERROR_SUCCESS) {
return ret;
}
srs_trace("reload updated ingester, "
"vhost=%s, id=%s", vhost.c_str(), ingest_id.c_str());
return ret;
}
#endif