#include "FBS/rtpParameters.h"
#include "FBS/transport.h"
#include "RTC/RTP/Packet.hpp"
#include "RTC/RTP/RtpStream.hpp"
#include "RTC/RTP/RtpStreamRecv.hpp"
#include "RTC/RTP/SharedPacket.hpp"
#include "RTC/RtpDictionaries.hpp"
#include "RTC/SimpleConsumer.hpp"
#include "flatbuffers/buffer.h"
#include "mocks/include/MockShared.hpp"
#include <catch2/catch_test_macros.hpp>
namespace
{
const uint8_t payloadType = 111;
mocks::MockShared shared(
[]()
{
return 1000;
});
class RtpStreamRecvListener : public RTC::RTP::RtpStreamRecv::Listener
{
public:
void OnRtpStreamScore(
RTC::RTP::RtpStream* , uint8_t , uint8_t ) override
{
}
void OnRtpStreamSendRtcpPacket(RTC::RTP::RtpStreamRecv* rtpStream, RTC::RTCP::Packet* packet) override
{
}
void OnRtpStreamNeedWorstRemoteFractionLost(
RTC::RTP::RtpStreamRecv* , uint8_t& ) override
{
}
};
class ConsumerListener : public RTC::Consumer::Listener
{
void OnConsumerSendRtpPacket(RTC::Consumer* , RTC::RTP::Packet* packet) final
{
this->sent.push_back(packet->GetSequenceNumber());
};
void OnConsumerRetransmitRtpPacket(RTC::Consumer* consumer, RTC::RTP::Packet* packet) final
{
}
void OnConsumerKeyFrameRequested(RTC::Consumer* consumer, uint32_t mappedSsrc) final {};
void OnConsumerNeedBitrateChange(RTC::Consumer* consumer) final {};
void OnConsumerNeedZeroBitrate(RTC::Consumer* consumer) final {};
void OnConsumerProducerClosed(RTC::Consumer* consumer) final {};
public:
void Verify(size_t size)
{
REQUIRE(this->sent.size() == size);
if (this->sent.size() <= 1)
{
return;
}
auto currentSeq = this->sent[0];
for (auto it = std::next(this->sent.begin()); it != this->sent.end(); ++it)
{
REQUIRE(*it == currentSeq + 1);
currentSeq = *it;
}
}
private:
std::vector<uint16_t> sent;
};
flatbuffers::Offset<::flatbuffers::Vector<::flatbuffers::Offset<FBS::RtpParameters::RtpEncodingParameters>>> createRtpEncodingParameters(
flatbuffers::FlatBufferBuilder& builder)
{
std::vector<flatbuffers::Offset<FBS::RtpParameters::RtpEncodingParameters>> encodings;
auto encoding = RTC::RtpEncodingParameters();
encoding.ssrc = 1234567890;
encodings.emplace_back(encoding.FillBuffer(builder));
return builder.CreateVector(encodings);
};
flatbuffers::Offset<FBS::RtpParameters::RtpParameters> createRtpParameters(
flatbuffers::FlatBufferBuilder& builder)
{
auto rtpParameters = RTC::RtpParameters();
auto codec = RTC::RtpCodecParameters();
auto encoding = RTC::RtpEncodingParameters();
codec.mimeType.SetMimeType("audio/opus");
codec.payloadType = payloadType;
encoding.ssrc = 1234567890;
rtpParameters.mid = "mid";
rtpParameters.codecs.emplace_back(codec);
rtpParameters.encodings.emplace_back(encoding);
rtpParameters.headerExtensions = std::vector<RTC::RtpHeaderExtensionParameters>();
return rtpParameters.FillBuffer(builder);
};
std::unique_ptr<RTC::SimpleConsumer> createConsumer(ConsumerListener* listener)
{
flatbuffers::FlatBufferBuilder bufferBuilder;
auto consumerId = bufferBuilder.CreateString("consumerId");
auto producerId = bufferBuilder.CreateString("producerId");
auto rtpParameters = createRtpParameters(bufferBuilder);
auto consumableEncodings = createRtpEncodingParameters(bufferBuilder);
auto consumeRequestBuilder = FBS::Transport::ConsumeRequestBuilder(bufferBuilder);
consumeRequestBuilder.add_consumerId(consumerId);
consumeRequestBuilder.add_producerId(producerId);
consumeRequestBuilder.add_kind(FBS::RtpParameters::MediaKind::AUDIO);
consumeRequestBuilder.add_rtpParameters(rtpParameters);
consumeRequestBuilder.add_type(FBS::RtpParameters::Type::SIMPLE);
consumeRequestBuilder.add_consumableRtpEncodings(consumableEncodings);
consumeRequestBuilder.add_paused(false);
consumeRequestBuilder.add_preferredLayers(0);
consumeRequestBuilder.add_ignoreDtx(false);
auto offset = consumeRequestBuilder.Finish();
bufferBuilder.Finish(offset);
auto* buf = bufferBuilder.GetBufferPointer();
const auto* consumeRequest = flatbuffers::GetRoot<FBS::Transport::ConsumeRequest>(buf);
return std::make_unique<RTC::SimpleConsumer>(
std::addressof(shared),
consumeRequest->consumerId()->str(),
consumeRequest->producerId()->str(),
listener,
consumeRequest);
}
std::unique_ptr<RTC::RTP::RtpStreamRecv> createRtpStreamRecv()
{
RtpStreamRecvListener streamRecvListener;
RTC::RTP::RtpStream::Params params;
return std::make_unique<RTC::RTP::RtpStreamRecv>(
&streamRecvListener, std::addressof(shared), params, 0u, false);
}
class Fixture
{
public:
Fixture()
: listener(std::make_unique<ConsumerListener>()),
consumer(createConsumer(listener.get())),
rtpStream(createRtpStreamRecv())
{
const std::vector<uint8_t> scores{ 10 };
consumer->ProducerRtpStreamScores(&scores);
consumer->ProducerNewRtpStream(rtpStream.get(), 1234567890);
}
std::unique_ptr<ConsumerListener> listener;
std::unique_ptr<RTC::SimpleConsumer> consumer;
std::unique_ptr<RTC::RTP::RtpStreamRecv> rtpStream;
};
}
SCENARIO("SimpleConsumer", "[rtp][consumer]")
{
alignas(4) uint8_t buffer[] =
{
0x80, 0x01, 0x00, 0x08,
0x00, 0x00, 0x00, 0x04,
0x49, 0x96, 0x02, 0xD2, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF,
};
const size_t originalPacketLength{ 16 };
SECTION("RTP packets are not forwarded when the consumer is not active")
{
Fixture fixture;
auto* packet = RTC::RTP::Packet::Parse(buffer, originalPacketLength + 64);
RTC::RTP::SharedPacket sharedPacket(packet);
packet->SetPayloadType(payloadType);
fixture.consumer->SendRtpPacket(packet, sharedPacket);
fixture.listener->Verify(0);
delete packet;
}
SECTION("RTP packets are not forwarded for unsupported payload types")
{
Fixture fixture;
dynamic_cast<RTC::Consumer*>(fixture.consumer.get())->TransportConnected();
auto* packet = RTC::RTP::Packet::Parse(buffer, originalPacketLength + 64);
RTC::RTP::SharedPacket sharedPacket(packet);
packet->SetPayloadType(payloadType + 1);
fixture.consumer->SendRtpPacket(packet, sharedPacket);
fixture.listener->Verify(0);
delete packet;
}
SECTION("RTP packets with empty payload are not forwarded")
{
Fixture fixture;
dynamic_cast<RTC::Consumer*>(fixture.consumer.get())->TransportConnected();
auto* packet = RTC::RTP::Packet::Parse(buffer, originalPacketLength + 0);
RTC::RTP::SharedPacket sharedPacket(packet);
packet->SetPayloadType(payloadType + 1);
fixture.consumer->SendRtpPacket(packet, sharedPacket);
fixture.listener->Verify(0);
delete packet;
}
SECTION("outgoing RTP packets are forwarded with increased sequence number")
{
Fixture fixture;
dynamic_cast<RTC::Consumer*>(fixture.consumer.get())->TransportConnected();
auto* packet = RTC::RTP::Packet::Parse(buffer, originalPacketLength + 64);
RTC::RTP::SharedPacket sharedPacket(packet);
uint16_t seq{ 1 };
packet->SetSequenceNumber(seq++);
packet->SetPayloadType(payloadType);
sharedPacket.Assign(packet);
fixture.consumer->SendRtpPacket(packet, sharedPacket);
packet->SetSequenceNumber(seq++);
sharedPacket.Assign(packet);
fixture.consumer->SendRtpPacket(packet, sharedPacket);
packet->SetSequenceNumber(seq++);
sharedPacket.Assign(packet);
fixture.consumer->SendRtpPacket(packet, sharedPacket);
packet->SetSequenceNumber(seq++);
packet->RemovePayload();
sharedPacket.Assign(packet);
fixture.consumer->SendRtpPacket(packet, sharedPacket);
fixture.listener->Verify(3);
delete packet;
}
}