#![allow(
dead_code,
mutable_transmutes,
non_camel_case_types,
non_snake_case,
non_upper_case_globals,
unused_assignments,
unused_mut
)]
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
#[derive(Copy, Clone)]
#[repr(C)]
pub struct queue_t {
pub len: libc::size_t,
pub in_use: bool,
pub head: *mut queue_node_t,
pub tail: *mut queue_node_t,
pub mutex: libc::pthread_mutex_t,
pub writing_cond: libc::pthread_cond_t,
pub threads_reading: libc::size_t,
}
#[derive(Copy, Clone)]
#[repr(C)]
pub struct queue_node_t {
pub contents: *mut libc::c_void,
pub previous: *mut queue_node_t,
pub next: *mut queue_node_t,
}
unsafe extern "C" fn get_write_lock_enqueue(mut queue: *mut queue_t) {
libc::pthread_mutex_lock(&mut (*queue).mutex);
while (*queue).in_use as libc::c_int != 0 && (*queue).threads_reading > 0 {
libc::pthread_cond_wait(&mut (*queue).writing_cond, &mut (*queue).mutex);
}
}
unsafe extern "C" fn get_write_lock_dequeue(mut queue: *mut queue_t) {
libc::pthread_mutex_lock(&mut (*queue).mutex);
while (*queue).in_use as libc::c_int != 0
&& ((*queue).head.is_null() || (*queue).threads_reading > 0)
{
libc::pthread_cond_wait(&mut (*queue).writing_cond, &mut (*queue).mutex);
}
}
unsafe extern "C" fn release_write_lock(mut queue: *mut queue_t) {
libc::pthread_cond_broadcast(&mut (*queue).writing_cond);
libc::pthread_mutex_unlock(&mut (*queue).mutex);
}
#[no_mangle]
pub unsafe extern "C" fn queue_init() -> *mut queue_t {
let mut queue: *mut queue_t = libc::calloc(1, ::std::mem::size_of::<queue_t>()) as *mut queue_t;
if queue.is_null() {
return 0 as *mut queue_t;
}
(*queue).head = 0 as *mut queue_node_t;
(*queue).tail = 0 as *mut queue_node_t;
let mut pthread_error: libc::c_int = 0 as libc::c_int;
pthread_error =
libc::pthread_mutex_init(&mut (*queue).mutex, 0 as *const libc::pthread_mutexattr_t);
assert!(pthread_error == 0);
pthread_error = libc::pthread_cond_init(
&mut (*queue).writing_cond,
0 as *const libc::pthread_condattr_t,
);
assert!(pthread_error == 0);
(*queue).threads_reading = 0 as libc::c_int as libc::size_t;
(*queue).in_use = 1 as libc::c_int != 0;
return queue;
}
unsafe extern "C" fn initialize_head_node(
mut queue: *mut queue_t,
mut content_p: *mut libc::c_void,
) -> libc::c_int {
if queue.is_null() || !(*queue).head.is_null() || !(*queue).tail.is_null() {
return 1 as libc::c_int;
}
(*queue).head = libc::calloc(1, ::std::mem::size_of::<queue_node_t>()) as *mut queue_node_t;
(*(*queue).head).contents = content_p;
(*(*queue).head).previous = 0 as *mut queue_node_t;
(*(*queue).head).next = 0 as *mut queue_node_t;
(*queue).tail = (*queue).head;
(*queue).len = 1 as libc::c_int as libc::size_t;
return 0 as libc::c_int;
}
pub fn queue_enqueue<T: Sync>(queue: &mut queue_t, value: &mut T) {
if queue.is_null() || !(*queue).in_use {
return;
}
get_write_lock_enqueue(queue);
if !(*queue).in_use {
return;
} else {
if (*queue).head.is_null() || (*queue).tail.is_null() {
let mut i: libc::c_int = initialize_head_node(queue, value);
assert!(i == 0);
} else {
let mut node_to_enqueue: *mut queue_node_t =
libc::calloc(1, ::std::mem::size_of::<queue_node_t>()) as *mut queue_node_t;
if node_to_enqueue.is_null() {
return;
} else {
(*node_to_enqueue).contents = value;
(*node_to_enqueue).previous = (*queue).tail;
(*node_to_enqueue).next = 0 as *mut queue_node_t;
(*(*queue).tail).next = node_to_enqueue;
(*queue).tail = node_to_enqueue;
assert!((*queue).len.wrapping_add(1) < 18446744073709551615);
(*queue).len = ((*queue).len).wrapping_add(1)
}
}
}
release_write_lock(queue);
}
pub fn queue_dequeue(queue: &mut queue_t) -> impl Sync {
if queue.is_null() || !(*queue).in_use {
return 0 as *mut libc::c_void;
}
let mut head_value: *mut libc::c_void = 0 as *mut libc::c_void;
get_write_lock_dequeue(queue);
if (*queue).in_use {
head_value = (*(*queue).head).contents;
if (*queue).head == (*queue).tail {
libc::free((*queue).head as *mut libc::c_void);
(*queue).head = 0 as *mut queue_node_t;
(*queue).tail = 0 as *mut queue_node_t;
(*queue).len = 0 as libc::c_int as libc::size_t
} else {
let mut next_node: *mut queue_node_t = (*(*queue).head).next;
libc::free((*queue).head as *mut libc::c_void);
(*next_node).previous = 0 as *mut queue_node_t;
(*queue).head = next_node;
assert!((*queue).len.wrapping_sub(1) >= 0);
(*queue).len = ((*queue).len).wrapping_sub(1)
}
}
release_write_lock(queue);
return head_value;
}
#[no_mangle]
pub unsafe extern "C" fn queue_free(mut queue: *mut queue_t) {
if queue.is_null() || !(*queue).in_use {
return;
}
get_write_lock_enqueue(queue);
if (*queue).in_use {
let mut next_node: *mut queue_node_t = 0 as *mut queue_node_t;
let mut current_node: *mut queue_node_t = (*queue).head;
while !current_node.is_null() {
next_node = (*current_node).next;
libc::free(current_node as *mut libc::c_void);
current_node = next_node
}
(*queue).head = 0 as *mut queue_node_t;
(*queue).tail = 0 as *mut queue_node_t;
(*queue).in_use = 0 as libc::c_int != 0
}
release_write_lock(queue);
assert!((*queue).threads_reading == 0);
libc::pthread_mutex_destroy(&mut (*queue).mutex);
libc::pthread_cond_destroy(&mut (*queue).writing_cond);
libc::free(queue as *mut libc::c_void);
}