codspeed 4.7.0

Core instrumentation library for CodSpeed
Documentation
const bincode = @import("../bincode.zig");
const std = @import("std");
const shared = @import("../shared.zig");

const fs = std.fs;
const os = std.os;
const mem = std.mem;
const Allocator = std.mem.Allocator;
const Path = []const u8;
pub const Command = shared.Command;

extern "c" fn mkfifo(path: [*:0]const u8, mode: c_uint) c_int;

/// Wait until `fd` is readable, returning `error.AckTimeout` if no data
/// arrives within `timeout_ns`. Used to gate blocking reads so we never
/// consume bytes from a partially-written FIFO frame and lose framing.
fn waitReadable(fd: std.posix.fd_t, timeout_ns: u64) !void {
    var pfd = [_]std.posix.pollfd{.{
        .fd = fd,
        .events = std.posix.POLL.IN,
        .revents = 0,
    }};
    const ms_total = timeout_ns / std.time.ns_per_ms;
    const timeout_ms: i32 = if (ms_total > std.math.maxInt(i32))
        std.math.maxInt(i32)
    else
        @intCast(ms_total);
    const ready = std.posix.poll(&pfd, timeout_ms) catch return error.AckTimeout;
    if (ready == 0) return error.AckTimeout;
}

pub const Pipe = struct {
    pub const Reader = struct {
        file: fs.File,
        allocator: Allocator,
        buffer: std.ArrayList(u8),

        pub fn init(file: fs.File, allocator: Allocator) Reader {
            var buffer = std.ArrayList(u8).init(allocator);
            // Pre-allocate 1KB to avoid allocations for typical command sizes
            buffer.ensureTotalCapacity(1024) catch {};
            return .{
                .file = file,
                .allocator = allocator,
                .buffer = buffer,
            };
        }

        pub fn read(self: *Reader, buffer: []u8) !usize {
            return self.file.read(buffer);
        }

        pub fn readAll(self: *Reader, buffer: []u8) !usize {
            return self.file.readAll(buffer);
        }

        // IMPORTANT: Caller is responsible for freeing the returned command.
        pub fn recvCmd(self: *Reader) !Command {
            // First read the length (u32 = 4 bytes)
            var len_buffer: [4]u8 = undefined;
            const len_read = self.file.readAll(&len_buffer) catch |err| {
                // Convert blocking/broken pipe errors to something the timeout logic can handle
                return switch (err) {
                    error.WouldBlock, error.BrokenPipe => error.NotReady,
                    else => err,
                };
            };
            if (len_read < 4) {
                return error.UnexpectedEof;
            }
            const message_len = std.mem.readInt(u32, &len_buffer, std.builtin.Endian.little);

            // Resize buffer to fit message (only allocates if growing)
            try self.buffer.resize(message_len);

            const msg_read = self.file.readAll(self.buffer.items) catch |err| {
                return switch (err) {
                    error.WouldBlock, error.BrokenPipe => error.NotReady,
                    else => err,
                };
            };
            if (msg_read < message_len) {
                return error.UnexpectedEof;
            }

            var stream = std.io.fixedBufferStream(self.buffer.items);
            return try bincode.deserializeAlloc(stream.reader(), self.allocator, Command);
        }

        pub fn waitForResponse(self: *Reader, timeout_ns: ?u64) !Command {
            const timeout = timeout_ns orelse std.time.ns_per_s * 1; // Default 1 second timeout
            try waitReadable(self.file.handle, timeout);
            return self.recvCmd();
        }

        pub fn waitForAck(self: *Reader, timeout_ns: ?u64) !void {
            const response = try self.waitForResponse(timeout_ns);
            defer response.deinit(self.allocator);

            switch (response) {
                .Ack => return,
                .Err => return error.UnexpectedError,
                else => {
                    const logger = @import("../logger.zig");
                    logger.debug("waitForAck received unexpected response: {}\n", .{response});
                    return error.UnexpectedResponse;
                },
            }
        }

        pub fn deinit(self: *Reader) void {
            // Drain any pending data from the FIFO before closing to prevent
            // stale messages from being read by subsequent connections.
            // This is crucial when multiple instrument types probe the same FIFO
            // (e.g., AnalysisInstrument fails, then WalltimeInstrument tries).
            // The fd is blocking, so poll with timeout=0 to consume only what's
            // currently available without ever blocking on an empty FIFO.
            var dummy_buffer: [4096]u8 = undefined;
            while (true) {
                var pfd = [_]std.posix.pollfd{.{
                    .fd = self.file.handle,
                    .events = std.posix.POLL.IN,
                    .revents = 0,
                }};
                const ready = std.posix.poll(&pfd, 0) catch break;
                if (ready == 0) break;
                const bytes_read = self.file.read(&dummy_buffer) catch break;
                if (bytes_read == 0) break;
            }

            self.buffer.deinit();
            self.file.close();
        }
    };

    pub const Writer = struct {
        file: fs.File,
        allocator: Allocator,
        buffer: std.ArrayList(u8),

        pub fn init(file: fs.File, allocator: Allocator) Writer {
            var buffer = std.ArrayList(u8).init(allocator);
            // Pre-allocate 1KB to avoid allocations for typical command sizes
            buffer.ensureTotalCapacity(1024) catch {};
            return .{
                .file = file,
                .allocator = allocator,
                .buffer = buffer,
            };
        }

        pub fn write(self: *Writer, buffer: []const u8) !usize {
            return self.file.write(buffer);
        }

        pub fn writeAll(self: *Writer, buffer: []const u8) !void {
            return self.file.writeAll(buffer);
        }

        pub fn sendCmd(self: *Writer, cmd: Command) !void {
            // Clear buffer but keep allocated capacity
            self.buffer.clearRetainingCapacity();

            try bincode.serialize(self.buffer.writer(), cmd);

            const bytes = self.buffer.items;
            try self.file.writeAll(std.mem.asBytes(&@as(u32, @intCast(bytes.len))));
            try self.file.writeAll(bytes);
        }

        pub fn deinit(self: *Writer) void {
            self.buffer.deinit();
            self.file.close();
        }
    };

    /// Create a new named pipe at the given path
    pub fn create(path: [*:0]const u8) !void {
        // Remove the previous FIFO (if it exists)
        fs.deleteFileAbsolute(std.mem.span(path)) catch {};

        if (mkfifo(path, 0o700) != 0) {
            return error.FifoCreationFailed;
        }
    }

    fn openPipe(path: []const u8) !fs.File {
        try fs.accessAbsolute(path, .{ .mode = .read_write });
        return try fs.openFileAbsolute(path, .{
            .mode = .read_write,
        });
    }

    pub fn openRead(allocator: Allocator, path: []const u8) !Reader {
        const file = try openPipe(path);
        return Reader.init(file, allocator);
    }

    pub fn openWrite(allocator: Allocator, path: []const u8) !Writer {
        const file = try openPipe(path);
        return Writer.init(file, allocator);
    }
};

pub fn sendCmd(allocator: Allocator, cmd: Command) !void {
    var writer = try Pipe.openWrite(allocator, shared.RUNNER_CTL_FIFO);
    defer writer.deinit();
    try writer.sendCmd(cmd);

    var reader = try Pipe.openRead(allocator, shared.RUNNER_ACK_FIFO);
    defer reader.deinit();
    try reader.waitForAck(null);
}

pub fn sendVersion(allocator: Allocator, protocol_version: u64) !void {
    const cmd = Command{ .SetVersion = protocol_version };
    try sendCmd(allocator, cmd);
}

test "fail if doesn't exist" {
    const allocator = std.testing.allocator;

    const nonexistent_path = "/tmp/nonexistent_pipe_test.fifo";

    // Ensure it doesn't exist
    fs.deleteFileAbsolute(nonexistent_path) catch {};

    // Attempt to open for reading should fail
    const reader_result = Pipe.openRead(allocator, nonexistent_path);
    try std.testing.expectError(error.FileNotFound, reader_result);

    // Attempt to open for writing should fail
    const writer_result = Pipe.openWrite(allocator, nonexistent_path);
    try std.testing.expectError(error.FileNotFound, writer_result);

    // Attempt to send cmd to runner fifo
    fs.deleteFileAbsolute(shared.RUNNER_ACK_FIFO) catch {};
    fs.deleteFileAbsolute(shared.RUNNER_CTL_FIFO) catch {};

    const sendcmd_result = sendCmd(allocator, Command.StartBenchmark);
    try std.testing.expectError(error.FileNotFound, sendcmd_result);
}

test "unix pipe write read" {
    const allocator = std.testing.allocator;
    const test_path = "/tmp/test1.fifo";

    try Pipe.create(test_path);

    var reader = try Pipe.openRead(allocator, test_path);
    defer reader.deinit();

    var writer = try Pipe.openWrite(allocator, test_path);
    defer writer.deinit();

    const message = "Hello";
    try writer.writeAll(message);

    var buffer: [5]u8 = undefined;
    _ = try reader.readAll(&buffer);

    try std.testing.expectEqualStrings(message, &buffer);
}

test "unix pipe send recv cmd" {
    const allocator = std.testing.allocator;
    const test_path = "/tmp/test2.fifo";

    try Pipe.create(test_path);

    var reader = try Pipe.openRead(allocator, test_path);
    defer reader.deinit();

    var writer = try Pipe.openWrite(allocator, test_path);
    defer writer.deinit();

    try writer.sendCmd(Command.StartBenchmark);
    const cmd = try reader.recvCmd();
    defer cmd.deinit(writer.allocator);

    try std.testing.expectEqual(Command.StartBenchmark, cmd);
}

test "unix pipe send without ack" {
    const allocator = std.testing.allocator;
    const test_path = "/tmp/test_no_ack.fifo";

    try Pipe.create(test_path);

    // Open both reader and writer so they don't block on open
    var reader = try Pipe.openRead(allocator, test_path);
    defer reader.deinit();

    var writer = try Pipe.openWrite(allocator, test_path);
    defer writer.deinit();

    // Writer doesn't send anything, so waitForResponse should timeout
    const result = reader.waitForResponse(std.time.ns_per_ms * 100);
    try std.testing.expectError(error.AckTimeout, result);
}

test "unix pipe prevents stale messages between connections" {
    const allocator = std.testing.allocator;
    const test_path = "/tmp/test_stale_messages.fifo";

    try Pipe.create(test_path);

    // Keep writer open throughout to maintain the FIFO
    var writer = try Pipe.openWrite(allocator, test_path);
    defer writer.deinit();

    // STEP 1: Simulate first connection
    {
        var first_reader = try Pipe.openRead(allocator, test_path);

        // Send and successfully read first command
        try writer.sendCmd(Command.StartBenchmark);
        const cmd1 = try first_reader.recvCmd();
        defer cmd1.deinit(allocator);
        try std.testing.expect(cmd1.equal(Command.StartBenchmark));

        // Send second command but DON'T read it
        try writer.sendCmd(Command.StopBenchmark);

        // Close first reader WITHOUT reading the second command
        // This should drain the unread StopBenchmark message
        first_reader.deinit();
    }

    // STEP 2: Simulate second connection
    {
        var second_reader = try Pipe.openRead(allocator, test_path);
        defer second_reader.deinit();

        // Send fresh command
        try writer.sendCmd(Command.Ack);

        // This should read the fresh Ack, NOT the stale StopBenchmark
        const cmd2 = try second_reader.recvCmd();
        defer cmd2.deinit(allocator);

        // CRITICAL ASSERTION: We should receive the fresh Ack
        // Without the drain logic, this would fail with StopBenchmark
        try std.testing.expect(cmd2.equal(Command.Ack));
    }
}