const std = @import("std");
const Thread = std.Thread;
const Mutex = std.Thread.Mutex;
const Condition = std.Thread.Condition;
const Atomic = std.atomic.Atomic;
const Counter = struct {
value: Atomic(u32),
fn init() Counter {
return .{ .value = Atomic(u32).init(0) };
}
fn increment(self: *Counter) void {
_ = self.value.fetchAdd(1, .monotonic);
}
fn get(self: *Counter) u32 {
return self.value.load(.monotonic);
}
};
fn worker_thread(id: usize, counter: *Counter) void {
for (0..1000) |_| {
counter.increment();
}
std.debug.print("Worker {} finished\n", .{id});
}
test "atomic counter" {
var counter = Counter.init();
const t1 = try Thread.spawn(.{}, worker_thread, .{ 1, &counter });
const t2 = try Thread.spawn(.{}, worker_thread, .{ 2, &counter });
const t3 = try Thread.spawn(.{}, worker_thread, .{ 3, &counter });
t1.join();
t2.join();
t3.join();
std.debug.print("Final count: {}\n", .{counter.get()});
}
const SafeQueue = struct {
const Self = @This();
allocator: std.mem.Allocator,
mutex: Mutex,
cond: Condition,
items: std.ArrayList(i32),
closed: bool,
fn init(allocator: std.mem.Allocator) Self {
return .{
.allocator = allocator,
.mutex = .{},
.cond = .{},
.items = std.ArrayList(i32).init(allocator),
.closed = false,
};
}
fn deinit(self: *Self) void {
self.items.deinit();
}
fn push(self: *Self, item: i32) !void {
self.mutex.lock();
defer self.mutex.unlock();
try self.items.append(item);
self.cond.signal();
}
fn pop(self: *Self) ?i32 {
self.mutex.lock();
defer self.mutex.unlock();
while (self.items.items.len == 0 and !self.closed) {
self.cond.wait(&self.mutex);
}
if (self.items.items.len == 0) return null;
return self.items.orderedRemove(0);
}
fn close(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
self.closed = true;
self.cond.broadcast();
}
};
fn producer(queue: *SafeQueue, start: i32, count: i32) !void {
var i: i32 = 0;
while (i < count) : (i += 1) {
try queue.push(start + i);
std.time.sleep(1 * std.time.ns_per_ms);
}
}
fn consumer(queue: *SafeQueue, id: usize) void {
while (queue.pop()) |item| {
std.debug.print("Consumer {} got: {}\n", .{ id, item });
}
}
test "safe queue" {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var queue = SafeQueue.init(allocator);
defer queue.deinit();
const p1 = try Thread.spawn(.{}, producer, .{ &queue, 0, 5 });
const c1 = try Thread.spawn(.{}, consumer, .{ &queue, 1 });
p1.join();
queue.close();
c1.join();
}
const Semaphore = struct {
const Self = @This();
count: Atomic(i32),
mutex: Mutex,
cond: Condition,
fn init(initial: i32) Self {
return .{
.count = Atomic(i32).init(initial),
.mutex = .{},
.cond = .{},
};
}
fn wait(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.count.load(.monotonic) <= 0) {
self.cond.wait(&self.mutex);
}
_ = self.count.fetchSub(1, .monotonic);
}
fn post(self: *Self) void {
_ = self.count.fetchAdd(1, .monotonic);
self.cond.signal();
}
};
const ReadWriteLock = struct {
const Self = @This();
mutex: Mutex,
read_cond: Condition,
write_cond: Condition,
readers: i32,
writer: bool,
fn init() Self {
return .{
.mutex = .{},
.read_cond = .{},
.write_cond = .{},
.readers = 0,
.writer = false,
};
}
fn readLock(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.writer) {
self.read_cond.wait(&self.mutex);
}
self.readers += 1;
}
fn readUnlock(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
self.readers -= 1;
if (self.readers == 0) {
self.write_cond.signal();
}
}
fn writeLock(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.writer or self.readers > 0) {
self.write_cond.wait(&self.mutex);
}
self.writer = true;
}
fn writeUnlock(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
self.writer = false;
self.read_cond.broadcast();
self.write_cond.signal();
}
};
fn worker_pool_task(id: usize, work: []const u8) void {
std.debug.print("Worker {} processing: {s}\n", .{ id, work });
}
test "thread pool pattern" {
const pool_size = 4;
const work_items = [_][]const u8{
"task 1",
"task 2",
"task 3",
"task 4",
"task 5",
};
var threads: [pool_size]Thread = undefined;
var thread_index: usize = 0;
for (work_items, 0..) |work, i| {
threads[thread_index] = try Thread.spawn(.{}, worker_pool_task, .{ i % pool_size, work });
thread_index += 1;
if (thread_index == pool_size) {
for (0..pool_size) |j| {
threads[j].join();
}
thread_index = 0;
}
}
for (0..thread_index) |j| {
threads[j].join();
}
}
pub fn main() !void {
std.debug.print("Concurrency examples\n", .{});
}