#include <errno.h>
#include "dada_udp.h"
#define KERNEL_BUFFER_SIZE_MAX 67108864
#define KERNEL_BUFFER_SIZE_DEFAULT 131071
#define STATS_INIT = {0, 0, 0, 0}
stats_t * init_stats_t()
{
stats_t * s = (stats_t *) malloc(sizeof(stats_t));
reset_stats_t (s);
return s;
}
void reset_stats_t(stats_t * s)
{
s->received = 0;
s->dropped = 0;
s->received_per_sec = 0;
s->dropped_per_sec = 0;
}
int dada_udp_sock_in(multilog_t* log, const char* iface, int port, int verbose)
{
int fd = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (fd < 0) {
multilog(log, LOG_ERR, "dada_udp_sock_in: socket() failed [iface=%s, port=%d]: %s\n",
iface, port, strerror(errno));
return -1;
}
if (verbose)
multilog(log, LOG_INFO, "created UDP socket\n");
struct sockaddr_in udp_sock;
bzero(&(udp_sock.sin_zero), 8); udp_sock.sin_family = AF_INET; udp_sock.sin_port = htons(port); if (strcmp(iface,"any") == 0) {
udp_sock.sin_addr.s_addr = htonl(INADDR_ANY); } else {
udp_sock.sin_addr.s_addr = inet_addr(iface); }
if (bind(fd, (struct sockaddr *)&udp_sock, sizeof(udp_sock)) == -1) {
multilog (log, LOG_ERR, "dada_udp_sock_in: bind() failed [iface=%s, port=%d]: %s\n",
iface, port, strerror(errno));
return -1;
}
if (verbose)
multilog(log, LOG_INFO, "bound UDP socket to %s:%d\n", iface, port);
int on = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0) {
multilog (log, LOG_WARNING, "dada_udp_sock_in: setsockopt(SO_REUSEADDR) failed "
"[iface=%s, port=%d]: %s\n", iface, port, strerror(errno));
}
if (verbose)
multilog(log, LOG_INFO, "UDP socket bound to %s:%d\n", iface, port);
return fd;
}
int dada_udp_sock_set_buffer_size (multilog_t* log, int fd, int verbose, int pref_size) {
return dada_udp_sock_set_size(log, fd, verbose, pref_size, SO_RCVBUF);
}
int dada_udp_sock_set_size (multilog_t* log, int fd, int verbose, int pref_size, int buf) {
const int std_buffer_size = KERNEL_BUFFER_SIZE_DEFAULT;
int value = 0;
int len = 0;
int retval = 0;
value = pref_size;
len = sizeof(value);
retval = setsockopt(fd, SOL_SOCKET, buf, &value, len);
if (retval != 0) {
perror("setsockopt");
return -1;
}
len = sizeof(value);
value = 0;
retval = getsockopt(fd, SOL_SOCKET, buf, &value, (socklen_t *) &len);
if (retval != 0) {
perror("getsockopt");
return -1;
}
if (value*2 != pref_size && value/2 != pref_size)
{
multilog (log, LOG_WARNING, "Warning. Failed to set udp socket's "
"buffer size to: %d, falling back to default size: %d (return value: %d)\n",
pref_size, std_buffer_size, value);
len = sizeof(value);
value = std_buffer_size;
retval = setsockopt(fd, SOL_SOCKET, buf, &value, len);
if (retval != 0) {
perror("setsockopt");
return -1;
}
len = sizeof(value);
value = 0;
retval = getsockopt(fd, SOL_SOCKET, buf, &value,
(socklen_t *) &len);
if (retval != 0) {
perror("getsockopt");
return -1;
}
if (value/2 != std_buffer_size) {
multilog (log, LOG_WARNING, "Warning. Failed to set udp socket's "
"buffer size to: %d\n", std_buffer_size);
}
} else {
if (verbose)
multilog(log, LOG_INFO, "UDP socket buffer size set to %d\n", pref_size);
}
return 0;
}
int dada_udp_sock_out(int *fd, struct sockaddr_in * dagram, char *client,
int port, int bcast, const char * bcast_addr) {
*fd = socket(PF_INET, SOCK_DGRAM, 0);
if (*fd < 0) {
perror("failed to create UDP socket");
return(1);
}
if (bcast) {
int yes = 1;
if (setsockopt(*fd, SOL_SOCKET, SO_BROADCAST, &yes, sizeof yes) < 0) {
perror("Could not SO_BROADCAST");
return(1);
}
}
int on = 1;
if (setsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) != 0)
fprintf(stderr, "dada_udp_sock_out: setsockopt(SO_REUSEADDR) failed: %s",
strerror(errno));
struct in_addr *addr;
dagram->sin_family = AF_INET; dagram->sin_port = htons(port);
if (bcast) {
dagram->sin_addr.s_addr = inet_addr (bcast_addr);
}
else
{
addr = atoaddr(client);
if (!addr)
{
fprintf (stderr, "dada_udp_sock_out: failed atoaddr(%s)\n", client);
return 1;
}
dagram->sin_addr.s_addr = addr->s_addr;
}
bzero(&(dagram->sin_zero), 8);
return 0;
}
struct in_addr *atoaddr(char *address) {
struct hostent *host;
static struct in_addr saddr;
saddr.s_addr = inet_addr(address);
if ((int) saddr.s_addr != -1) {
return &saddr;
}
host = gethostbyname(address);
if (host != NULL) {
return (struct in_addr *) *host->h_addr_list;
}
return NULL;
}
size_t dada_sock_recv (int fd, char* buffer, size_t size, int flags)
{
size_t received = 0;
received = recvfrom (fd, buffer, size, 0, NULL, NULL);
if (received < 0) {
perror ("sock_recv recvfrom");
return -1;
}
if (received == 0) {
fprintf (stderr, "sock_recv received zero bytes\n");
}
return received;
}
size_t dada_sock_send(int fd, struct sockaddr_in addr, char *data, size_t size) {
size_t numbytes;
size_t socksize = sizeof(struct sockaddr);
if ((numbytes=sendto(fd, data, size, 0, (struct sockaddr *)&addr, socksize)) < 0) {
perror("sendto");
exit(1);
}
return numbytes;
}
size_t dada_sock_clear_buffered_packets (int fd, size_t size)
{
size_t bytes_cleared = 0;
size_t bytes_read = 0;
unsigned keep_reading = 1;
int errsv;
char * buffer = (char *) malloc(size);
if (!buffer) {
fprintf(stderr, "dada_sock_clear_buffered_packets: malloc %ld bytes failed\n", size);
return -1;
}
while ( keep_reading)
{
bytes_read = recvfrom (fd, buffer, size, 0, NULL, NULL);
if (bytes_read == size)
{
bytes_cleared += bytes_read;
}
else if (bytes_read == -1)
{
keep_reading = 0;
errsv = errno;
if (errsv != EAGAIN)
fprintf(stderr, "dada_sock_clear_buffered_packets: recvfrom failed: %s\n", strerror(errsv));
}
else
{
fprintf(stderr, "dada_sock_clear_buffered_packets: received %ld byte packet, expected %ld\n", bytes_read, size);
keep_reading = 0;
}
}
free(buffer);
return bytes_cleared;
}