const std = @import("std");
const fifo = @import("fifo.zig");
const shared = @import("shared.zig");
const logger = @import("logger.zig");
// v1: Initial release
// v2: Added GetIntegrationMode
pub const PROTOCOL_VERSION: u64 = 2;
pub const RunnerFifo = struct {
allocator: std.mem.Allocator,
writer: fifo.UnixPipe.Writer,
reader: fifo.UnixPipe.Reader,
const Self = @This();
pub fn init(allocator: std.mem.Allocator) !Self {
return .{
.allocator = allocator,
.writer = try fifo.UnixPipe.openWrite(allocator, shared.RUNNER_CTL_FIFO),
.reader = try fifo.UnixPipe.openRead(allocator, shared.RUNNER_ACK_FIFO),
};
}
pub fn validate_protocol_version(self: *Self) !void {
self.send_version(PROTOCOL_VERSION) catch |err| {
switch (err) {
// No runner present - silently continue as NOP
error.AckTimeout => return,
// Runner explicitly rejected version
error.UnexpectedError => {
logger.err("instrument-hooks: CodSpeed runner rejected protocol version {}\n", .{PROTOCOL_VERSION});
logger.err("instrument-hooks: please update the CodSpeed action to the latest version\n", .{});
std.posix.exit(1);
},
// All other errors - log and exit
else => {
logger.err("instrument-hooks: error {s} during version check\n", .{@errorName(err)});
std.posix.exit(1);
},
}
};
}
pub fn deinit(self: *Self) void {
self.writer.deinit();
self.reader.deinit();
}
pub fn send_cmd(self: *Self, cmd: fifo.Command) !void {
try self.writer.sendCmd(cmd);
try self.reader.waitForAck(null);
}
pub fn ping_perf(self: *Self) bool {
self.send_cmd(fifo.Command.PingPerf) catch {
return false;
};
return true;
}
pub noinline fn start_benchmark(self: *Self) !void {
@branchHint(.cold); // Prevent inline
try self.writer.sendCmd(fifo.Command.StartBenchmark);
try self.reader.waitForAck(null);
}
pub noinline fn stop_benchmark(self: *Self) !void {
@branchHint(.cold); // Prevent inline
try self.writer.sendCmd(fifo.Command.StopBenchmark);
try self.reader.waitForAck(null);
}
pub fn set_executed_benchmark(self: *Self, pid: u32, uri: [*c]const u8) !void {
try self.writer.sendCmd(fifo.Command{ .ExecutedBenchmark = .{
.pid = pid,
.uri = std.mem.span(uri),
} });
try self.reader.waitForAck(null);
}
pub fn set_integration(self: *Self, name: [*c]const u8, version: [*c]const u8) !void {
try self.writer.sendCmd(fifo.Command{ .SetIntegration = .{
.name = std.mem.span(name),
.version = std.mem.span(version),
} });
try self.reader.waitForAck(null);
}
pub fn add_marker(self: *Self, pid: u32, marker: shared.MarkerType) !void {
try self.writer.sendCmd(fifo.Command{ .AddMarker = .{
.pid = pid,
.marker = marker,
} });
try self.reader.waitForAck(null);
}
pub fn send_version(self: *Self, protocol_version: u64) !void {
try self.writer.sendCmd(fifo.Command{ .SetVersion = protocol_version });
try self.reader.waitForAck(null);
}
pub fn get_integration_mode(self: *Self) !shared.IntegrationMode {
// NOTE: Other messages send data to the runner, and expect an ACK response (see `sendCmd`). This
// command expects the runner to respond with data, so have to write and read directly.
try self.writer.sendCmd(fifo.Command.GetIntegrationMode);
const response = try self.reader.waitForResponse(null);
defer response.deinit(self.allocator);
if (response == .IntegrationModeResponse) {
return response.IntegrationModeResponse;
}
return error.UnexpectedResponse;
}
};
test "test runner fifo" {
const allocator = std.testing.allocator;
try fifo.UnixPipe.create(shared.RUNNER_ACK_FIFO);
try fifo.UnixPipe.create(shared.RUNNER_CTL_FIFO);
var ctl_fifo = try fifo.UnixPipe.openRead(allocator, shared.RUNNER_CTL_FIFO);
defer ctl_fifo.deinit();
var ack_fifo = try fifo.UnixPipe.openWrite(allocator, shared.RUNNER_ACK_FIFO);
defer ack_fifo.deinit();
const FifoTester = struct {
allocator: std.mem.Allocator,
ctl_pipe: *fifo.UnixPipe.Reader,
ack_pipe: *fifo.UnixPipe.Writer,
received_cmd: ?fifo.Command = null,
error_occurred: bool = false,
pub fn func(ctx: *@This()) void {
const received_cmd = ctx.ctl_pipe.waitForResponse(null) catch |err| {
std.debug.print("Failed to receive command: {}\n", .{err});
ctx.error_occurred = true;
return;
};
ctx.received_cmd = received_cmd;
ctx.ack_pipe.sendCmd(fifo.Command.Ack) catch |err| {
std.debug.print("Failed to send ACK: {}\n", .{err});
ctx.error_occurred = true;
};
}
pub fn send(self: *@This(), comptime f: anytype, args: anytype) !fifo.Command {
// 1. Create the thread which handles the command
// 2. Execute the callback
// 3. Wait for the thread to finish
//
const receiver_thread = try std.Thread.spawn(.{}, @This().func, .{self});
try @call(.auto, f, args);
receiver_thread.join();
if (self.error_occurred) {
return error.IntegrationError;
}
self.error_occurred = false;
return self.received_cmd.?;
}
};
var tester = FifoTester{
.allocator = allocator,
.ctl_pipe = &ctl_fifo,
.ack_pipe = &ack_fifo,
};
var runner_fifo = try RunnerFifo.init(allocator);
defer runner_fifo.deinit();
const si_result = try tester.send(RunnerFifo.set_integration, .{ &runner_fifo, "zig", "0.10.0" });
try std.testing.expect(si_result.equal(fifo.Command{ .SetIntegration = .{ .name = "zig", .version = "0.10.0" } }));
si_result.deinit(allocator);
const cb_result = try tester.send(RunnerFifo.set_executed_benchmark, .{ &runner_fifo, 42, "foo" });
try std.testing.expect(cb_result.equal(fifo.Command{ .ExecutedBenchmark = .{ .pid = 42, .uri = "foo" } }));
cb_result.deinit(allocator);
const start_result = try tester.send(RunnerFifo.start_benchmark, .{&runner_fifo});
try std.testing.expect(start_result.equal(fifo.Command.StartBenchmark));
start_result.deinit(allocator);
const stop_result = try tester.send(RunnerFifo.stop_benchmark, .{&runner_fifo});
try std.testing.expect(stop_result.equal(fifo.Command.StopBenchmark));
stop_result.deinit(allocator);
}