local stream_type = KEYS[1]
local source_id = KEYS[2]
local expected_version = tonumber(ARGV[1])
local source_stream = string.format("%s.%s", stream_type, source_id)
local current_version = tonumber(redis.call("XLEN", source_stream))
local last_version = current_version
local next_sequence_number = redis.call("XLEN", stream_type)
if expected_version > -1 and expected_version ~= current_version then
return redis.error_reply("expected version: " .. expected_version .. ", current version: " .. current_version)
end
for i, event in pairs({unpack(ARGV, 2)}) do
local version = current_version + i local sequence_number = next_sequence_number + i - 1
redis.call(
"XADD",
source_stream,
string.format("%d-%d", version, sequence_number),
"event", event
)
redis.call(
"XADD",
stream_type,
string.format("%d-%d", sequence_number, version),
"source_id", source_id,
"event", event
)
redis.call(
"PUBLISH",
stream_type,
string.format(
"{\"source_id\":\"%s\",\"sequence_number\":%d,\"version\":%d,\"event\":%s}",
source_id, sequence_number, version, event )
)
last_version = version
end
return last_version