apalis-redis 0.7.4

Redis Storage for apalis: use Redis for background jobs and message queueing
Documentation
-- KEYS[1]: the consumer set
-- KEYS[2]: the active job list
-- KEYS[3]: the signal list

-- ARGV[1]: the timestamp before which a consumer is considered expired
-- ARGV[2]: the max number of jobs to process in a given run

-- Returns: 0 if all orphaned jobs have been rescheduled, 1 if there are more to process

-- Find expired consumers
local consumers = redis.call("zrangebyscore", KEYS[1], 0, ARGV[1], "LIMIT", 0, ARGV[2])
redis.replicate_commands()
-- Pull jobs from the consumer's inflight set and reschedule up to limit
local limit = tonumber(ARGV[2])
for _,consumer in ipairs(consumers) do
  local jobs = redis.call("spop", consumer, limit)
  local count = table.getn(jobs)

  -- Push any orphaned jobs on to the message list
  if count > 0 then
    redis.call("rpush", KEYS[2], unpack(jobs))
  end

  -- Delete the consumer if all of its jobs have been rescheduled
  if count < limit then
    redis.call("zrem", KEYS[1], consumer)
  end


  -- Don't keep looping if we can't process any more jobs
  limit = limit - count
  if limit <= 0 then
    break
  end
end

local processed = tonumber(ARGV[2]) - limit

if processed > 0 then
  -- Signal that there are jobs in the queue
  redis.call("del", KEYS[3])
  redis.call("lpush", KEYS[3], 1)
end

return processed