codspeed 4.5.0

Core instrumentation library for CodSpeed
Documentation
const bincode = @import("bincode.zig");
const std = @import("std");
const shared = @import("shared.zig");

const fs = std.fs;
const os = std.os;
const mem = std.mem;
const Allocator = std.mem.Allocator;
const Path = []const u8;
pub const Command = shared.Command;

extern "c" fn mkfifo(path: [*:0]const u8, mode: c_uint) c_int;

pub const UnixPipe = struct {
    pub const Reader = struct {
        file: fs.File,
        allocator: Allocator,
        buffer: std.ArrayList(u8),

        pub fn init(file: fs.File, allocator: Allocator) Reader {
            var buffer = std.ArrayList(u8).init(allocator);
            // Pre-allocate 1KB to avoid allocations for typical command sizes
            buffer.ensureTotalCapacity(1024) catch {};
            return .{
                .file = file,
                .allocator = allocator,
                .buffer = buffer,
            };
        }

        pub fn read(self: *Reader, buffer: []u8) !usize {
            return self.file.read(buffer);
        }

        pub fn readAll(self: *Reader, buffer: []u8) !usize {
            return self.file.readAll(buffer);
        }

        // IMPORTANT: Caller is responsible for freeing the returned command.
        pub fn recvCmd(self: *Reader) !Command {
            // First read the length (u32 = 4 bytes)
            var len_buffer: [4]u8 = undefined;
            const len_read = self.file.readAll(&len_buffer) catch |err| {
                // Convert blocking/broken pipe errors to something the timeout logic can handle
                return switch (err) {
                    error.WouldBlock, error.BrokenPipe => error.NotReady,
                    else => err,
                };
            };
            if (len_read < 4) {
                return error.UnexpectedEof;
            }
            const message_len = std.mem.readInt(u32, &len_buffer, std.builtin.Endian.little);

            // Resize buffer to fit message (only allocates if growing)
            try self.buffer.resize(message_len);

            const msg_read = self.file.readAll(self.buffer.items) catch |err| {
                return switch (err) {
                    error.WouldBlock, error.BrokenPipe => error.NotReady,
                    else => err,
                };
            };
            if (msg_read < message_len) {
                return error.UnexpectedEof;
            }

            var stream = std.io.fixedBufferStream(self.buffer.items);
            return try bincode.deserializeAlloc(stream.reader(), self.allocator, Command);
        }

        pub fn waitForResponse(self: *Reader, timeout_ns: ?u64) !Command {
            const start = std.time.nanoTimestamp();
            const timeout = timeout_ns orelse std.time.ns_per_s * 1; // Default 1 second timeout

            while (true) {
                const elapsed = @as(u64, @intCast(std.time.nanoTimestamp() - start));
                if (elapsed > timeout) {
                    return error.AckTimeout;
                }

                const cmd = self.recvCmd() catch |err| {
                    // Only retry on transient errors, propagate fatal ones
                    switch (err) {
                        error.NotReady, error.UnexpectedEof => {
                            const utils = @import("utils.zig");
                            utils.sleep(std.time.ns_per_ms * 10);
                            continue;
                        },
                        else => return err,
                    }
                };

                return cmd;
            }
        }

        pub fn waitForAck(self: *Reader, timeout_ns: ?u64) !void {
            const response = try self.waitForResponse(timeout_ns);
            defer response.deinit(self.allocator);

            switch (response) {
                .Ack => return,
                .Err => return error.UnexpectedError,
                else => {
                    const logger = @import("logger.zig");
                    logger.debug("waitForAck received unexpected response: {}\n", .{response});
                    return error.UnexpectedResponse;
                },
            }
        }

        pub fn deinit(self: *Reader) void {
            // Drain any pending data from the FIFO before closing to prevent
            // stale messages from being read by subsequent connections.
            // This is crucial when multiple instrument types probe the same FIFO
            // (e.g., AnalysisInstrument fails, then PerfInstrument tries).
            var dummy_buffer: [4096]u8 = undefined;
            while (true) {
                const bytes_read = self.file.read(&dummy_buffer) catch break;
                if (bytes_read == 0) break;
            }

            self.buffer.deinit();
            self.file.close();
        }
    };

    pub const Writer = struct {
        file: fs.File,
        allocator: Allocator,
        buffer: std.ArrayList(u8),

        pub fn init(file: fs.File, allocator: Allocator) Writer {
            var buffer = std.ArrayList(u8).init(allocator);
            // Pre-allocate 1KB to avoid allocations for typical command sizes
            buffer.ensureTotalCapacity(1024) catch {};
            return .{
                .file = file,
                .allocator = allocator,
                .buffer = buffer,
            };
        }

        pub fn write(self: *Writer, buffer: []const u8) !usize {
            return self.file.write(buffer);
        }

        pub fn writeAll(self: *Writer, buffer: []const u8) !void {
            return self.file.writeAll(buffer);
        }

        pub fn sendCmd(self: *Writer, cmd: Command) !void {
            // Clear buffer but keep allocated capacity
            self.buffer.clearRetainingCapacity();

            try bincode.serialize(self.buffer.writer(), cmd);

            const bytes = self.buffer.items;
            try self.file.writeAll(std.mem.asBytes(&@as(u32, @intCast(bytes.len))));
            try self.file.writeAll(bytes);
        }

        pub fn deinit(self: *Writer) void {
            self.buffer.deinit();
            self.file.close();
        }
    };

    /// Create a new named pipe at the given path
    pub fn create(path: [*:0]const u8) !void {
        // Remove the previous FIFO (if it exists)
        fs.deleteFileAbsolute(std.mem.span(path)) catch {};

        if (mkfifo(path, 0o700) != 0) {
            return error.FifoCreationFailed;
        }
    }

    fn openPipe(path: []const u8) !fs.File {
        try fs.accessAbsolute(path, .{ .mode = .read_write });
        const file = try fs.openFileAbsolute(path, .{
            .mode = .read_write,
        });

        // Zig doesn't set the nonblocking flag correctly, so we have to do it manually.
        const utils = @import("utils.zig");
        utils.setNonBlocking(file.handle);

        return file;
    }

    pub fn openRead(allocator: Allocator, path: []const u8) !Reader {
        const file = try openPipe(path);
        return Reader.init(file, allocator);
    }

    pub fn openWrite(allocator: Allocator, path: []const u8) !Writer {
        const file = try openPipe(path);
        return Writer.init(file, allocator);
    }
};

pub fn sendCmd(allocator: Allocator, cmd: Command) !void {
    var writer = try UnixPipe.openWrite(allocator, shared.RUNNER_CTL_FIFO);
    defer writer.deinit();
    try writer.sendCmd(cmd);

    var reader = try UnixPipe.openRead(allocator, shared.RUNNER_ACK_FIFO);
    defer reader.deinit();
    try reader.waitForAck(null);
}

pub fn sendVersion(allocator: Allocator, protocol_version: u64) !void {
    const cmd = Command{ .SetVersion = protocol_version };
    try sendCmd(allocator, cmd);
}

test "fail if doesn't exist" {
    const allocator = std.testing.allocator;

    const nonexistent_path = "/tmp/nonexistent_pipe_test.fifo";

    // Ensure it doesn't exist
    fs.deleteFileAbsolute(nonexistent_path) catch {};

    // Attempt to open for reading should fail
    const reader_result = UnixPipe.openRead(allocator, nonexistent_path);
    try std.testing.expectError(error.FileNotFound, reader_result);

    // Attempt to open for writing should fail
    const writer_result = UnixPipe.openWrite(allocator, nonexistent_path);
    try std.testing.expectError(error.FileNotFound, writer_result);

    // Attempt to send cmd to runner fifo
    fs.deleteFileAbsolute(shared.RUNNER_ACK_FIFO) catch {};
    fs.deleteFileAbsolute(shared.RUNNER_CTL_FIFO) catch {};

    const sendcmd_result = sendCmd(allocator, Command.StartBenchmark);
    try std.testing.expectError(error.FileNotFound, sendcmd_result);
}

test "unix pipe write read" {
    const allocator = std.testing.allocator;
    const test_path = "/tmp/test1.fifo";

    try UnixPipe.create(test_path);

    var reader = try UnixPipe.openRead(allocator, test_path);
    defer reader.deinit();

    var writer = try UnixPipe.openWrite(allocator, test_path);
    defer writer.deinit();

    const message = "Hello";
    try writer.writeAll(message);

    var buffer: [5]u8 = undefined;
    _ = try reader.readAll(&buffer);

    try std.testing.expectEqualStrings(message, &buffer);
}

test "unix pipe send recv cmd" {
    const allocator = std.testing.allocator;
    const test_path = "/tmp/test2.fifo";

    try UnixPipe.create(test_path);

    var reader = try UnixPipe.openRead(allocator, test_path);
    defer reader.deinit();

    var writer = try UnixPipe.openWrite(allocator, test_path);
    defer writer.deinit();

    try writer.sendCmd(Command.StartBenchmark);
    const cmd = try reader.recvCmd();
    defer cmd.deinit(writer.allocator);

    try std.testing.expectEqual(Command.StartBenchmark, cmd);
}

test "unix pipe send without ack" {
    const allocator = std.testing.allocator;
    const test_path = "/tmp/test_no_ack.fifo";

    try UnixPipe.create(test_path);

    // Open both reader and writer so they don't block on open
    var reader = try UnixPipe.openRead(allocator, test_path);
    defer reader.deinit();

    var writer = try UnixPipe.openWrite(allocator, test_path);
    defer writer.deinit();

    // Writer doesn't send anything, so waitForResponse should timeout
    const result = reader.waitForResponse(std.time.ns_per_ms * 100);
    try std.testing.expectError(error.AckTimeout, result);
}

test "unix pipe prevents stale messages between connections" {
    const allocator = std.testing.allocator;
    const test_path = "/tmp/test_stale_messages.fifo";

    try UnixPipe.create(test_path);

    // Keep writer open throughout to maintain the FIFO
    var writer = try UnixPipe.openWrite(allocator, test_path);
    defer writer.deinit();

    // STEP 1: Simulate first connection
    {
        var first_reader = try UnixPipe.openRead(allocator, test_path);

        // Send and successfully read first command
        try writer.sendCmd(Command.StartBenchmark);
        const cmd1 = try first_reader.recvCmd();
        defer cmd1.deinit(allocator);
        try std.testing.expect(cmd1.equal(Command.StartBenchmark));

        // Send second command but DON'T read it
        try writer.sendCmd(Command.StopBenchmark);

        // Close first reader WITHOUT reading the second command
        // This should drain the unread StopBenchmark message
        first_reader.deinit();
    }

    // STEP 2: Simulate second connection
    {
        var second_reader = try UnixPipe.openRead(allocator, test_path);
        defer second_reader.deinit();

        // Send fresh command
        try writer.sendCmd(Command.Ack);

        // This should read the fresh Ack, NOT the stale StopBenchmark
        const cmd2 = try second_reader.recvCmd();
        defer cmd2.deinit(allocator);

        // CRITICAL ASSERTION: We should receive the fresh Ack
        // Without the drain logic, this would fail with StopBenchmark
        try std.testing.expect(cmd2.equal(Command.Ack));
    }
}