#include "config.h"
#include <assert.h>
#include <errno.h>
#include <string.h>
#ifdef WITH_BROKER
# include "mosquitto_broker_internal.h"
# ifdef WITH_WEBSOCKETS
# include <libwebsockets.h>
# endif
#else
# include "read_handle.h"
#endif
#include "memory_mosq.h"
#include "mqtt_protocol.h"
#include "net_mosq.h"
#include "packet_mosq.h"
#include "read_handle.h"
#include "util_mosq.h"
#ifdef WITH_BROKER
# include "sys_tree.h"
# include "send_mosq.h"
#else
# define G_BYTES_RECEIVED_INC(A)
# define G_BYTES_SENT_INC(A)
# define G_MSGS_SENT_INC(A)
# define G_PUB_MSGS_SENT_INC(A)
#endif
int packet__alloc(struct mosquitto__packet *packet)
{
uint8_t remaining_bytes[5], byte;
uint32_t remaining_length;
int i;
assert(packet);
remaining_length = packet->remaining_length;
packet->payload = NULL;
packet->remaining_count = 0;
do{
byte = remaining_length % 128;
remaining_length = remaining_length / 128;
if(remaining_length > 0){
byte = byte | 0x80;
}
remaining_bytes[packet->remaining_count] = byte;
packet->remaining_count++;
}while(remaining_length > 0 && packet->remaining_count < 5);
if(packet->remaining_count == 5) return MOSQ_ERR_PAYLOAD_SIZE;
packet->packet_length = packet->remaining_length + 1 + (uint8_t)packet->remaining_count;
#ifdef WITH_WEBSOCKETS
packet->payload = mosquitto__malloc(sizeof(uint8_t)*packet->packet_length + LWS_PRE);
#else
packet->payload = mosquitto__malloc(sizeof(uint8_t)*packet->packet_length);
#endif
if(!packet->payload) return MOSQ_ERR_NOMEM;
packet->payload[0] = packet->command;
for(i=0; i<packet->remaining_count; i++){
packet->payload[i+1] = remaining_bytes[i];
}
packet->pos = 1U + (uint8_t)packet->remaining_count;
return MOSQ_ERR_SUCCESS;
}
void packet__cleanup(struct mosquitto__packet *packet)
{
if(!packet) return;
packet->command = 0;
packet->remaining_count = 0;
packet->remaining_mult = 1;
packet->remaining_length = 0;
mosquitto__free(packet->payload);
packet->payload = NULL;
packet->to_process = 0;
packet->pos = 0;
}
void packet__cleanup_all_no_locks(struct mosquitto *mosq)
{
struct mosquitto__packet *packet;
if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet;
mosq->out_packet = mosq->out_packet->next;
}
while(mosq->current_out_packet){
packet = mosq->current_out_packet;
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next;
}
packet__cleanup(packet);
mosquitto__free(packet);
}
packet__cleanup(&mosq->in_packet);
}
void packet__cleanup_all(struct mosquitto *mosq)
{
pthread_mutex_lock(&mosq->current_out_packet_mutex);
pthread_mutex_lock(&mosq->out_packet_mutex);
packet__cleanup_all_no_locks(mosq);
pthread_mutex_unlock(&mosq->out_packet_mutex);
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
}
int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
{
#ifndef WITH_BROKER
char sockpair_data = 0;
#endif
assert(mosq);
assert(packet);
packet->pos = 0;
packet->to_process = packet->packet_length;
packet->next = NULL;
pthread_mutex_lock(&mosq->out_packet_mutex);
if(mosq->out_packet){
mosq->out_packet_last->next = packet;
}else{
mosq->out_packet = packet;
}
mosq->out_packet_last = packet;
pthread_mutex_unlock(&mosq->out_packet_mutex);
#ifdef WITH_BROKER
# ifdef WITH_WEBSOCKETS
if(mosq->wsi){
lws_callback_on_writable(mosq->wsi);
return MOSQ_ERR_SUCCESS;
}else{
return packet__write(mosq);
}
# else
return packet__write(mosq);
# endif
#else
if(mosq->sockpairW != INVALID_SOCKET){
#ifndef WIN32
if(write(mosq->sockpairW, &sockpair_data, 1)){
}
#else
send(mosq->sockpairW, &sockpair_data, 1, 0);
#endif
}
if(mosq->in_callback == false && mosq->threaded == mosq_ts_none){
return packet__write(mosq);
}else{
return MOSQ_ERR_SUCCESS;
}
#endif
}
int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length)
{
uint32_t len;
if(mosq->maximum_packet_size == 0) return MOSQ_ERR_SUCCESS;
len = remaining_length + packet__varint_bytes(remaining_length);
if(len > mosq->maximum_packet_size){
return MOSQ_ERR_OVERSIZE_PACKET;
}else{
return MOSQ_ERR_SUCCESS;
}
}
int packet__write(struct mosquitto *mosq)
{
ssize_t write_length;
struct mosquitto__packet *packet;
enum mosquitto_client_state state;
if(!mosq) return MOSQ_ERR_INVAL;
if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
pthread_mutex_lock(&mosq->current_out_packet_mutex);
pthread_mutex_lock(&mosq->out_packet_mutex);
if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet;
mosq->out_packet = mosq->out_packet->next;
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}
}
pthread_mutex_unlock(&mosq->out_packet_mutex);
state = mosquitto__get_state(mosq);
#if defined(WITH_TLS) && !defined(WITH_BROKER)
if(state == mosq_cs_connect_pending || mosq->want_connect){
#else
if(state == mosq_cs_connect_pending){
#endif
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
return MOSQ_ERR_SUCCESS;
}
while(mosq->current_out_packet){
packet = mosq->current_out_packet;
while(packet->to_process > 0){
write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
if(write_length > 0){
G_BYTES_SENT_INC(write_length);
packet->to_process -= (uint32_t)write_length;
packet->pos += (uint32_t)write_length;
}else{
#ifdef WIN32
errno = WSAGetLastError();
#endif
if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK
#ifdef WIN32
|| errno == WSAENOTCONN
#endif
){
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
return MOSQ_ERR_SUCCESS;
}else{
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
switch(errno){
case COMPAT_ECONNRESET:
return MOSQ_ERR_CONN_LOST;
case COMPAT_EINTR:
return MOSQ_ERR_SUCCESS;
default:
return MOSQ_ERR_ERRNO;
}
}
}
}
G_MSGS_SENT_INC(1);
if(((packet->command)&0xF6) == CMD_PUBLISH){
G_PUB_MSGS_SENT_INC(1);
#ifndef WITH_BROKER
pthread_mutex_lock(&mosq->callback_mutex);
if(mosq->on_publish){
mosq->in_callback = true;
mosq->on_publish(mosq, mosq->userdata, packet->mid);
mosq->in_callback = false;
}
if(mosq->on_publish_v5){
mosq->in_callback = true;
mosq->on_publish_v5(mosq, mosq->userdata, packet->mid, 0, NULL);
mosq->in_callback = false;
}
pthread_mutex_unlock(&mosq->callback_mutex);
}else if(((packet->command)&0xF0) == CMD_DISCONNECT){
do_client_disconnect(mosq, MOSQ_ERR_SUCCESS, NULL);
packet__cleanup(packet);
mosquitto__free(packet);
return MOSQ_ERR_SUCCESS;
#endif
}else if(((packet->command)&0xF0) == CMD_PUBLISH){
G_PUB_MSGS_SENT_INC(1);
}
pthread_mutex_lock(&mosq->out_packet_mutex);
mosq->current_out_packet = mosq->out_packet;
if(mosq->out_packet){
mosq->out_packet = mosq->out_packet->next;
if(!mosq->out_packet){
mosq->out_packet_last = NULL;
}
}
pthread_mutex_unlock(&mosq->out_packet_mutex);
packet__cleanup(packet);
mosquitto__free(packet);
#ifdef WITH_BROKER
mosq->next_msg_out = db.now_s + mosq->keepalive;
#else
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
pthread_mutex_unlock(&mosq->msgtime_mutex);
#endif
}
pthread_mutex_unlock(&mosq->current_out_packet_mutex);
return MOSQ_ERR_SUCCESS;
}
int packet__read(struct mosquitto *mosq)
{
uint8_t byte;
ssize_t read_length;
int rc = 0;
enum mosquitto_client_state state;
if(!mosq){
return MOSQ_ERR_INVAL;
}
if(mosq->sock == INVALID_SOCKET){
return MOSQ_ERR_NO_CONN;
}
state = mosquitto__get_state(mosq);
if(state == mosq_cs_connect_pending){
return MOSQ_ERR_SUCCESS;
}
if(!mosq->in_packet.command){
read_length = net__read(mosq, &byte, 1);
if(read_length == 1){
mosq->in_packet.command = byte;
#ifdef WITH_BROKER
G_BYTES_RECEIVED_INC(1);
if(!(mosq->bridge) && state == mosq_cs_connected && (byte&0xF0) != CMD_CONNECT){
return MOSQ_ERR_PROTOCOL;
}
#endif
}else{
if(read_length == 0){
return MOSQ_ERR_CONN_LOST;
}
#ifdef WIN32
errno = WSAGetLastError();
#endif
if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
return MOSQ_ERR_SUCCESS;
}else{
switch(errno){
case COMPAT_ECONNRESET:
return MOSQ_ERR_CONN_LOST;
case COMPAT_EINTR:
return MOSQ_ERR_SUCCESS;
default:
return MOSQ_ERR_ERRNO;
}
}
}
}
if(mosq->in_packet.remaining_count <= 0){
do{
read_length = net__read(mosq, &byte, 1);
if(read_length == 1){
mosq->in_packet.remaining_count--;
if(mosq->in_packet.remaining_count < -4){
return MOSQ_ERR_PROTOCOL;
}
G_BYTES_RECEIVED_INC(1);
mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
mosq->in_packet.remaining_mult *= 128;
}else{
if(read_length == 0){
return MOSQ_ERR_CONN_LOST;
}
#ifdef WIN32
errno = WSAGetLastError();
#endif
if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
return MOSQ_ERR_SUCCESS;
}else{
switch(errno){
case COMPAT_ECONNRESET:
return MOSQ_ERR_CONN_LOST;
case COMPAT_EINTR:
return MOSQ_ERR_SUCCESS;
default:
return MOSQ_ERR_ERRNO;
}
}
}
}while((byte & 128) != 0);
mosq->in_packet.remaining_count = (int8_t)(mosq->in_packet.remaining_count * -1);
#ifdef WITH_BROKER
if(db.config->max_packet_size > 0 && mosq->in_packet.remaining_length+1 > db.config->max_packet_size){
if(mosq->protocol == mosq_p_mqtt5){
send__disconnect(mosq, MQTT_RC_PACKET_TOO_LARGE, NULL);
}
return MOSQ_ERR_OVERSIZE_PACKET;
}
#else
#endif
if(mosq->in_packet.remaining_length > 0){
mosq->in_packet.payload = mosquitto__malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
if(!mosq->in_packet.payload){
return MOSQ_ERR_NOMEM;
}
mosq->in_packet.to_process = mosq->in_packet.remaining_length;
}
}
while(mosq->in_packet.to_process>0){
read_length = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
if(read_length > 0){
G_BYTES_RECEIVED_INC(read_length);
mosq->in_packet.to_process -= (uint32_t)read_length;
mosq->in_packet.pos += (uint32_t)read_length;
}else{
#ifdef WIN32
errno = WSAGetLastError();
#endif
if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
if(mosq->in_packet.to_process > 1000){
#ifdef WITH_BROKER
keepalive__update(mosq);
#else
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = mosquitto_time();
pthread_mutex_unlock(&mosq->msgtime_mutex);
#endif
}
return MOSQ_ERR_SUCCESS;
}else{
switch(errno){
case COMPAT_ECONNRESET:
return MOSQ_ERR_CONN_LOST;
case COMPAT_EINTR:
return MOSQ_ERR_SUCCESS;
default:
return MOSQ_ERR_ERRNO;
}
}
}
}
mosq->in_packet.pos = 0;
#ifdef WITH_BROKER
G_MSGS_RECEIVED_INC(1);
if(((mosq->in_packet.command)&0xF5) == CMD_PUBLISH){
G_PUB_MSGS_RECEIVED_INC(1);
}
#endif
rc = handle__packet(mosq);
packet__cleanup(&mosq->in_packet);
#ifdef WITH_BROKER
keepalive__update(mosq);
#else
pthread_mutex_lock(&mosq->msgtime_mutex);
mosq->last_msg_in = mosquitto_time();
pthread_mutex_unlock(&mosq->msgtime_mutex);
#endif
return rc;
}