libosdp-sys 3.2.1

Sys crate for https://github.com/goToMain/libosdp
Documentation
#include <stdio.h>
#include <errno.h>
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include <sys/select.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>

#include <utils/utils.h>
#include <utils/sockutils.h>
#include <utils/fdutils.h>
#include <utils/bus_server.h>

int g_message_id;
uint8_t g_message_data[1024];
int g_message_data_length;
pthread_mutex_t bus_global_lock;

struct bus_client_data {
	int fd;
	int message_id;
};

int bus_server_work_fn(void *arg)
{
	struct bus_client_data *p = arg;
	int rc = -1;
	ssize_t ret;
	int bus_data_len;
	uint8_t bus_data[1024];

	fcntl_setfl(p->fd, O_NONBLOCK);

	for (;;) {
		ret = read_loop(p->fd, bus_data, sizeof(bus_data));
		if (ret < 0) {
			perror("read failed!");
			rc = -1;
			break;
		}
		if (ret > 0) {
			p->message_id += 1;
			pthread_mutex_lock(&bus_global_lock);
			memcpy(g_message_data, bus_data, ret);
			g_message_data_length = (int)ret;
			g_message_id = p->message_id;
			pthread_mutex_unlock(&bus_global_lock);

		}
		if (g_message_id > p->message_id) {
			pthread_mutex_lock(&bus_global_lock);
			memcpy(bus_data, g_message_data, g_message_data_length);
			bus_data_len = g_message_data_length;
			p->message_id = g_message_id;
			pthread_mutex_unlock(&bus_global_lock);

			ret = write_loop(p->fd, bus_data, bus_data_len);
			if (ret < 0) {
				perror("write failed!");
				rc = -1;
				break;
			}
		}
	}
	close(p->fd);
	free(p);
	return rc;
}

int bus_server_queue_work(bus_server_t *s, int fd)
{
	int i;
	work_t *work = NULL;
	struct bus_client_data *client_data;

	for (i = 0; i < s->max_clients; i++) {
		work = s->work + i;
		if (work->status == WQ_WORK_NEW ||
		    work->status == WQ_WORK_COMPLETE)
			break;
	}

	if (i == s->max_clients)
		return -1;

	client_data = calloc(1, sizeof(struct bus_client_data));
	client_data->fd = fd;

	memset(work, 0, sizeof(work_t));
	work->arg = client_data;
	work->work_fn = bus_server_work_fn;

	workqueue_add_work(&s->wq, work);

	while (READ_ONCE(work->status) != WQ_WORK_IN_PROGRESS);

	return 0;
}

void *bus_server_serve(void *arg)
{
	int rc, fd;
	bus_server_t *s = arg;
	struct sockaddr_un cli_addr;
	socklen_t len;

	for(;;) {
		fd = accept(s->fd, (struct sockaddr *)&cli_addr, &len);
		if (fd < 0 && (errno == EAGAIN || errno == EINTR))
			continue;
		if (fd < 0) {
			perror("accept failed");
			break;
		}
		rc = bus_server_queue_work(s, fd);
		if (rc < 0) {
			printf("client[%d]: workqueue full; closing.\n", fd);
			close(fd);
		}
	}
	return NULL;
}

int bus_server_start(bus_server_t *s, int max_clients, const char *path)
{
	int rc;

	memset(s, 0, sizeof(bus_server_t));

	pthread_mutex_init(&bus_global_lock, NULL);

	rc = workqueue_create(&s->wq, max_clients);
	if (rc < 0) {
		printf("failed to setup workqueue\n");
		return -1;
	}

	s->work = calloc(max_clients, sizeof(work_t));
	if (s->work == NULL) {
		perror("work alloc failed");
		return -1;
	}

	rc = sock_unix_listen(path, max_clients);
	if (rc < 0) {
		perror("sock_unix_listen failed");
		return -1;
	}
	s->fd = rc;
	s->path = strdup(path);
	s->max_clients = max_clients;

	rc = pthread_create(&s->thread, NULL, bus_server_serve, (void *)s);
	if (rc < 0) {
		perror("pthread_create failed");
		return -1;
	}

	return 0;
}

void bus_server_stop(bus_server_t *s)
{
	workqueue_destroy(&s->wq);

	pthread_cancel(s->thread);
	pthread_join(s->thread, NULL);

	pthread_mutex_destroy(&bus_global_lock);

	close(s->fd);
	unlink(s->path);
	free(s->path);
}