Skip to main content

GENERATE_HEERID_SQL

Constant GENERATE_HEERID_SQL 

Source
pub const GENERATE_HEERID_SQL: &str = "DO $install$\nDECLARE\n    _sch text := COALESCE(current_schema(), \'public\');\nBEGIN\n    EXECUTE format($sql$\nCREATE OR REPLACE FUNCTION generate_ids(\n    in_node_id INTEGER,\n    requested_count INTEGER,\n    allow_spanning BOOLEAN DEFAULT true\n)\nRETURNS TABLE(id BIGINT)\nLANGUAGE plpgsql\nSET search_path = %I, pg_catalog\nAS $func$\nDECLARE\n    epoch_ms BIGINT;\n    now_ms BIGINT;\n    last_time BIGINT;\n    last_sequence INTEGER;\n    current_tick BIGINT;\n    next_sequence INTEGER;\n    remaining INTEGER;\n    available_this_tick INTEGER;\n    emit_count INTEGER;\n    last_emitted_time BIGINT;\n    last_emitted_sequence INTEGER;\n    rollback_ms BIGINT;\nBEGIN\n    IF requested_count IS NULL OR requested_count <= 0 THEN\n        RAISE EXCEPTION \'requested_count must be greater than zero\';\n    END IF;\n\n    IF in_node_id IS NULL OR in_node_id < 0 OR in_node_id > 511 THEN\n        RAISE EXCEPTION \'node_id %% is out of range for HeerId (0..511)\', in_node_id;\n    END IF;\n\n    IF NOT EXISTS (\n        SELECT 1 FROM heer_nodes WHERE node_id = in_node_id AND is_active = true\n    ) THEN\n        RAISE EXCEPTION \'node_id %% is not registered as an active Heer node\', in_node_id;\n    END IF;\n\n    SELECT FLOOR(EXTRACT(EPOCH FROM c.epoch) * 1000)::BIGINT\n    INTO epoch_ms\n    FROM heer_config AS c\n    WHERE c.id = 1;\n\n    IF epoch_ms IS NULL THEN\n        RAISE EXCEPTION \'heer_config row id=1 must exist before generating IDs\';\n    END IF;\n\n    INSERT INTO heer_node_state (node_id)\n    VALUES (in_node_id)\n    ON CONFLICT (node_id) DO NOTHING;\n\n    SELECT s.last_id_time, s.last_sequence\n    INTO last_time, last_sequence\n    FROM heer_node_state AS s\n    WHERE s.node_id = in_node_id\n    FOR UPDATE;\n\n    -- Calculate current time AFTER acquiring the lock to avoid false clock rollback\n    -- under concurrency (another thread may have advanced last_id_time while we waited)\n    now_ms := FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::BIGINT - epoch_ms;\n\n    rollback_ms := last_time - now_ms;\n    IF rollback_ms > 0 THEN\n        IF rollback_ms < 2 THEN\n            RAISE EXCEPTION \'logical future drift for node %% (%% ms) \u{2014} likely batch-induced, check batch sizing\', in_node_id, rollback_ms\n                USING ERRCODE = \'50021\';\n        ELSIF rollback_ms < 50 THEN\n            RAISE EXCEPTION \'clock rollback detected for node %% (%% ms)\', in_node_id, rollback_ms\n                USING ERRCODE = \'50020\';\n        ELSE\n            RAISE EXCEPTION \'hard clock rollback detected for node %% (%% ms)\', in_node_id, rollback_ms\n                USING ERRCODE = \'50022\';\n        END IF;\n    END IF;\n\n    current_tick := GREATEST(now_ms, last_time);\n    next_sequence := CASE\n        WHEN current_tick = last_time THEN last_sequence + 1\n        ELSE 0\n    END;\n\n    available_this_tick := 8192 - next_sequence;\n    IF NOT allow_spanning AND requested_count > available_this_tick THEN\n        RAISE EXCEPTION\n            \'requested %% IDs but only %% remain in millisecond %% for node %%\',\n            requested_count,\n            available_this_tick,\n            current_tick,\n            in_node_id;\n    END IF;\n\n    remaining := requested_count;\n\n    WHILE remaining > 0 LOOP\n        available_this_tick := 8192 - next_sequence;\n        emit_count := LEAST(remaining, available_this_tick);\n\n        RETURN QUERY\n        SELECT\n            (\n                (current_tick::BIGINT << 22)\n                | (in_node_id::BIGINT << 13)\n                | series.sequence::BIGINT\n            ) AS id\n        FROM generate_series(next_sequence, next_sequence + emit_count - 1) AS series(sequence);\n\n        last_emitted_time := current_tick;\n        last_emitted_sequence := next_sequence + emit_count - 1;\n        remaining := remaining - emit_count;\n        current_tick := current_tick + 1;\n        next_sequence := 0;\n    END LOOP;\n\n    UPDATE heer_node_state\n    SET last_id_time = last_emitted_time,\n        last_sequence = last_emitted_sequence,\n        updated_at = CURRENT_TIMESTAMP\n    WHERE node_id = in_node_id;\nEND;\n$func$;\n$sql$, _sch);\nEND;\n$install$;\n\nCREATE OR REPLACE FUNCTION generate_ids(\n    requested_count INTEGER,\n    allow_spanning BOOLEAN\n)\nRETURNS TABLE(id BIGINT)\nLANGUAGE sql\nAS $$\n    SELECT id\n    FROM generate_ids(current_heer_node_id(), requested_count, allow_spanning);\n$$;\n\nCREATE OR REPLACE FUNCTION generate_ids(requested_count INTEGER)\nRETURNS TABLE(id BIGINT)\nLANGUAGE sql\nAS $$\n    SELECT id\n    FROM generate_ids(current_heer_node_id(), requested_count, true);\n$$;\n\nCREATE OR REPLACE FUNCTION generate_id(in_node_id INTEGER)\nRETURNS BIGINT\nLANGUAGE sql\nAS $$\n    SELECT id\n    FROM generate_ids(in_node_id, 1, true);\n$$;\n\nCREATE OR REPLACE FUNCTION generate_id()\nRETURNS BIGINT\nLANGUAGE sql\nAS $$\n    SELECT id\n    FROM generate_ids(current_heer_node_id(), 1, true);\n$$;\n";