#include "queue.h"
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>
struct queue {
size_t len;
bool in_use;
queue_node_t *head;
queue_node_t *tail;
pthread_mutex_t mutex;
pthread_cond_t writing_cond;
size_t threads_reading;
};
struct queue_node {
void *contents;
queue_node_t *previous;
queue_node_t *next;
};
static void get_write_lock_enqueue(queue_t *queue) {
pthread_mutex_lock(&queue->mutex);
while (queue->in_use && queue->threads_reading > 0) {
pthread_cond_wait(&queue->writing_cond, &queue->mutex);
}
}
static void get_write_lock_dequeue(queue_t *queue) {
pthread_mutex_lock(&queue->mutex);
while (queue->in_use && (queue->head == NULL || queue->threads_reading > 0)) {
pthread_cond_wait(&queue->writing_cond, &queue->mutex);
}
}
static void release_write_lock(queue_t *queue) {
pthread_cond_broadcast(&queue->writing_cond);
pthread_mutex_unlock(&queue->mutex);
}
queue_t *queue_init(void) {
queue_t *queue = (queue_t *) calloc(1, sizeof(queue_t));
if (queue == NULL) {
return NULL;
}
queue->head = NULL;
queue->tail = NULL;
int pthread_error = 0;
pthread_error = pthread_mutex_init(&queue->mutex, NULL);
assert(!pthread_error);
pthread_error = pthread_cond_init(&queue->writing_cond, NULL);
assert(!pthread_error);
queue->threads_reading = 0;
queue->in_use = true;
return queue;
}
static int initialize_head_node(queue_t *queue, void *content_p) {
if (queue == NULL || queue->head != NULL ||
queue->tail != NULL) {
return 1;
}
queue->head = (queue_node_t *) calloc(1, sizeof(queue_node_t));
queue->head->contents = content_p;
queue->head->previous = NULL;
queue->head->next = NULL;
queue->tail = queue->head;
queue->len = 1;
return 0;
}
void queue_enqueue(queue_t *queue, void *value) {
if (queue == NULL || !queue->in_use) {
return;
}
get_write_lock_enqueue(queue);
if (!queue->in_use) {
return;
}
else if (queue->head == NULL || queue->tail == NULL) {
int i = initialize_head_node(queue, value);
assert(i == 0);
}
else {
queue_node_t *node_to_enqueue = (queue_node_t *) calloc(1, sizeof(queue_node_t));
if (node_to_enqueue == NULL) {
return;
}
else {
node_to_enqueue->contents = value;
node_to_enqueue->previous = queue->tail;
node_to_enqueue->next = NULL;
queue->tail->next = node_to_enqueue;
queue->tail = node_to_enqueue;
assert(queue->len + 1 < __SIZE_MAX__);
queue->len += 1;
}
}
release_write_lock(queue);
return;
}
void *queue_dequeue(queue_t *queue) {
if (queue == NULL || !queue->in_use) {
return NULL;
}
void *head_value = NULL;
get_write_lock_dequeue(queue);
if (queue->in_use) {
head_value = queue->head->contents;
if (queue->head == queue->tail) {
free(queue->head);
queue->head = NULL;
queue->tail = NULL;
queue->len = 0;
}
else {
queue_node_t *next_node = queue->head->next;
free(queue->head);
next_node->previous = NULL;
queue->head = next_node;
assert(queue->len - 1 >= 0);
queue->len -= 1;
}
}
release_write_lock(queue);
return head_value;
}
void queue_free(queue_t *queue) {
if (queue == NULL || !queue->in_use) {
return;
}
get_write_lock_enqueue(queue);
if (queue->in_use) {
queue_node_t *next_node;
for (queue_node_t *current_node = queue->head;
current_node != NULL ;
current_node = next_node) {
next_node = current_node->next;
free(current_node);
}
queue->head = NULL;
queue->tail = NULL;
queue->in_use = false;
}
release_write_lock(queue);
assert(queue->threads_reading == 0);
pthread_mutex_destroy(&queue->mutex);
pthread_cond_destroy(&queue->writing_cond);
free(queue);
}