1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright 2017 Kyle Mayes
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Lock-free queues.
//!
//! # Examples
//!
//! ## Bounded SPSC
//!
//! ```
//! extern crate npnc;
//!
//! use std::thread;
//!
//! use npnc::bounded::spsc;
//!
//! fn main() {
//!     let (producer, consumer) = spsc::channel(64);
//!
//!     // Producer
//!     let b = thread::spawn(move || {
//!         for index in 0..32 {
//!             producer.produce(index).unwrap();
//!         }
//!     });
//!
//!     // Consumer
//!     let a = thread::spawn(move || {
//!         loop {
//!             if let Ok(item) = consumer.consume() {
//!                 println!("{}", item);
//!                 if item == 31 {
//!                     break;
//!                 }
//!             }
//!         }
//!     });
//!
//!     a.join().unwrap();
//!     b.join().unwrap();
//! }
//! ```

#![cfg_attr(feature="valgrind", feature(alloc_system))]

#![warn(missing_copy_implementations, missing_debug_implementations, missing_docs)]

#[cfg(feature="valgrind")]
extern crate alloc_system;

extern crate hazard;

use std::error;
use std::fmt;

#[macro_use]
mod utility;
mod buffer;
pub mod bounded;
pub mod unbounded;

/// The number of pointers that fit in a 128 byte cacheline.
#[cfg(target_pointer_width="32")]
const POINTERS: usize = 32;
#[cfg(target_pointer_width="64")]
const POINTERS: usize = 16;

//================================================
// Enums
//================================================

// ConsumeError __________________________________

/// Indicates the reason a `consume` operation could not return an item.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum ConsumeError {
    /// The queue was empty and had no remaining producers.
    Disconnected,
    /// The queue was empty.
    Empty,
}

impl error::Error for ConsumeError {
    fn description(&self) -> &str {
        match *self {
            ConsumeError::Disconnected => "the queue was empty and had no remaining producers",
            ConsumeError::Empty => "the queue was empty",
        }
    }
}

impl fmt::Display for ConsumeError {
    fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
        write!(formatter, "{}", error::Error::description(self))
    }
}

// ProduceError __________________________________

/// Indicates the reason a `produce` operation rejected an item.
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum ProduceError<T> {
    /// The queue had no remaining consumers.
    Disconnected(T),
    /// The queue was full.
    Full(T),
}

impl<T> ProduceError<T> {
    //- Consumers --------------------------------

    /// Returns the rejected item.
    pub fn item(self) -> T {
        match self { ProduceError::Disconnected(item) | ProduceError::Full(item) => item }
    }
}

impl<T> error::Error for ProduceError<T> {
    fn description(&self) -> &str {
        match *self {
            ProduceError::Disconnected(_) => "the queue had no remaining consumers",
            ProduceError::Full(_) => "the queue was full",
        }
    }
}

impl<T> fmt::Debug for ProduceError<T> {
    fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            ProduceError::Disconnected(_) => write!(formatter, "ProduceError::Disconnected(..)"),
            ProduceError::Full(_) => write!(formatter, "ProduceError::Full(..)"),
        }
    }
}

impl<T> fmt::Display for ProduceError<T> {
    fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
        write!(formatter, "{}", error::Error::description(self))
    }
}