Skip to main content

CONFIGURE_SQL

Constant CONFIGURE_SQL 

Source
pub const CONFIGURE_SQL: &str = "DROP FUNCTION IF EXISTS heer_configure();\n\nDO $install$\nDECLARE\n    _sch text := COALESCE(current_schema(), \'public\');\nBEGIN\n    EXECUTE format($sql$\nCREATE OR REPLACE FUNCTION heer_configure(\n    force_reset_state BOOLEAN DEFAULT false\n)\nRETURNS VOID\nLANGUAGE plpgsql\nSET search_path = %I, pg_catalog\nAS $func$\nDECLARE\n    _sch             text := COALESCE(current_schema(), \'public\');\n    cfg_epoch        TIMESTAMP;\n    cfg_precision    VARCHAR(2);\n    cfg_offset       NUMERIC(30,0);\n    epoch_ms         BIGINT;\n    epoch_ticks      NUMERIC(30,0);\n    precision_bits   INTEGER;\n    multiplier       NUMERIC;\n    unit_name        TEXT;\n    max_ts_41        BIGINT;\n    max_ts_89        NUMERIC(30,0);\n    logical_threshold NUMERIC(30,0);\n    rollback_threshold NUMERIC(30,0);\n    smoke_heerid     BIGINT;\n    smoke_ranjid     UUID;\nBEGIN\n    -- ----------------------------------------------------------------\n    -- 1. Read config\n    -- ----------------------------------------------------------------\n    SELECT c.epoch, c.precision, c.ranj_epoch_offset\n    INTO   cfg_epoch, cfg_precision, cfg_offset\n    FROM   heer_config AS c\n    WHERE  c.id = 1;\n\n    IF NOT FOUND THEN\n        RAISE EXCEPTION \'heer_config row id=1 does not exist\';\n    END IF;\n\n    -- ----------------------------------------------------------------\n    -- 2. Validate\n    -- ----------------------------------------------------------------\n    IF cfg_epoch IS NULL THEN\n        RAISE EXCEPTION \'heer_config.epoch must not be NULL\';\n    END IF;\n\n    IF cfg_epoch > clock_timestamp() THEN\n        RAISE EXCEPTION \'heer_config.epoch (%%) is in the future\', cfg_epoch;\n    END IF;\n\n    IF cfg_precision NOT IN (\'us\', \'ns\', \'ps\', \'fs\') THEN\n        RAISE EXCEPTION \'heer_config.precision must be one of us, ns, ps, fs; got \"%%\"\', cfg_precision;\n    END IF;\n\n    -- Compute epoch in milliseconds for HeerId\n    epoch_ms := FLOOR(EXTRACT(EPOCH FROM cfg_epoch) * 1000)::BIGINT;\n\n    -- Compute multiplier and precision_bits based on precision\n    CASE cfg_precision\n        WHEN \'us\' THEN multiplier := 1000000;           precision_bits := 0; unit_name := \'microseconds\';\n        WHEN \'ns\' THEN multiplier := 1000000000;         precision_bits := 1; unit_name := \'nanoseconds\';\n        WHEN \'ps\' THEN multiplier := 1000000000000;      precision_bits := 2; unit_name := \'picoseconds\';\n        WHEN \'fs\' THEN multiplier := 1000000000000000;   precision_bits := 3; unit_name := \'femtoseconds\';\n    END CASE;\n\n    -- Compute epoch in target precision units for RanjId\n    epoch_ticks := FLOOR(EXTRACT(EPOCH FROM cfg_epoch) * multiplier)::NUMERIC(30,0);\n\n    -- Verify epoch fits in 41 bits for HeerId (max ~69 years from epoch)\n    max_ts_41 := (2::BIGINT ^ 41) - 1;\n    IF epoch_ms < 0 THEN\n        RAISE EXCEPTION \'HeerId epoch_ms is negative (%%); epoch too far in the past for BIGINT arithmetic\', epoch_ms;\n    END IF;\n\n    -- Verify epoch fits in 89 bits for RanjId\n    max_ts_89 := (2::NUMERIC ^ 89) - 1;\n    -- We just need epoch_ticks to be non-negative; the actual range check is on current_tick at runtime\n    IF epoch_ticks < 0 THEN\n        RAISE EXCEPTION \'RanjId epoch_ticks is negative; epoch is invalid\';\n    END IF;\n\n    -- Scale rollback thresholds from microseconds to ticks in the configured precision:\n    --   logical_threshold  = 2000 us  (likely batch-induced drift)\n    --   rollback_threshold = 50000 us (hard clock rollback boundary)\n    -- ticks = us * (multiplier / 1000000)\n    logical_threshold  := FLOOR(2000::NUMERIC  * multiplier / 1000000)::NUMERIC(30,0);\n    rollback_threshold := FLOOR(50000::NUMERIC * multiplier / 1000000)::NUMERIC(30,0);\n\n    -- ----------------------------------------------------------------\n    -- 3. Regenerate HeerId function\n    -- ----------------------------------------------------------------\n    EXECUTE format($fmt$\nCREATE OR REPLACE FUNCTION generate_ids(\n    in_node_id INTEGER,\n    requested_count INTEGER,\n    allow_spanning BOOLEAN DEFAULT true\n)\nRETURNS TABLE(id BIGINT)\nLANGUAGE plpgsql\nSET search_path = %%2$I, pg_catalog\nAS $generated$\nDECLARE\n    -- Epoch baked in by heer_configure()\n    epoch_ms CONSTANT BIGINT := %%1$s;\n    now_ms BIGINT;\n    last_time BIGINT;\n    last_sequence INTEGER;\n    current_tick BIGINT;\n    next_sequence INTEGER;\n    remaining INTEGER;\n    available_this_tick INTEGER;\n    emit_count INTEGER;\n    last_emitted_time BIGINT;\n    last_emitted_sequence INTEGER;\n    rollback_ms BIGINT;\nBEGIN\n    IF requested_count IS NULL OR requested_count <= 0 THEN\n        RAISE EXCEPTION \'requested_count must be greater than zero\';\n    END IF;\n\n    IF in_node_id IS NULL OR in_node_id < 0 OR in_node_id > 511 THEN\n        RAISE EXCEPTION \'node_id %%%% is out of range for HeerId (0..511)\', in_node_id;\n    END IF;\n\n    IF NOT EXISTS (\n        SELECT 1 FROM heer_nodes WHERE node_id = in_node_id AND is_active = true\n    ) THEN\n        RAISE EXCEPTION \'node_id %%%% is not registered as an active Heer node\', in_node_id;\n    END IF;\n\n    INSERT INTO heer_node_state (node_id)\n    VALUES (in_node_id)\n    ON CONFLICT (node_id) DO NOTHING;\n\n    SELECT s.last_id_time, s.last_sequence\n    INTO last_time, last_sequence\n    FROM heer_node_state AS s\n    WHERE s.node_id = in_node_id\n    FOR UPDATE;\n\n    -- Calculate current time AFTER acquiring the lock to avoid false clock rollback\n    -- under concurrency (another thread may have advanced last_id_time while we waited)\n    now_ms := FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::BIGINT - epoch_ms;\n\n    rollback_ms := last_time - now_ms;\n    IF rollback_ms > 0 THEN\n        IF rollback_ms < 2 THEN\n            RAISE EXCEPTION \'logical future drift for node %%%% (%%%% ms) \u{2014} likely batch-induced, check batch sizing\', in_node_id, rollback_ms\n                USING ERRCODE = \'50021\';\n        ELSIF rollback_ms < 50 THEN\n            RAISE EXCEPTION \'clock rollback detected for node %%%% (%%%% ms)\', in_node_id, rollback_ms\n                USING ERRCODE = \'50020\';\n        ELSE\n            RAISE EXCEPTION \'hard clock rollback detected for node %%%% (%%%% ms)\', in_node_id, rollback_ms\n                USING ERRCODE = \'50022\';\n        END IF;\n    END IF;\n\n    current_tick := GREATEST(now_ms, last_time);\n    next_sequence := CASE\n        WHEN current_tick = last_time THEN last_sequence + 1\n        ELSE 0\n    END;\n\n    available_this_tick := 8192 - next_sequence;\n    IF NOT allow_spanning AND requested_count > available_this_tick THEN\n        RAISE EXCEPTION\n            \'requested %%%% IDs but only %%%% remain in millisecond %%%% for node %%%%\',\n            requested_count,\n            available_this_tick,\n            current_tick,\n            in_node_id;\n    END IF;\n\n    remaining := requested_count;\n\n    WHILE remaining > 0 LOOP\n        available_this_tick := 8192 - next_sequence;\n        emit_count := LEAST(remaining, available_this_tick);\n\n        RETURN QUERY\n        SELECT\n            (\n                (current_tick::BIGINT << 22)\n                | (in_node_id::BIGINT << 13)\n                | series.sequence::BIGINT\n            ) AS id\n        FROM generate_series(next_sequence, next_sequence + emit_count - 1) AS series(sequence);\n\n        last_emitted_time := current_tick;\n        last_emitted_sequence := next_sequence + emit_count - 1;\n        remaining := remaining - emit_count;\n        current_tick := current_tick + 1;\n        next_sequence := 0;\n    END LOOP;\n\n    UPDATE heer_node_state\n    SET last_id_time = last_emitted_time,\n        last_sequence = last_emitted_sequence,\n        updated_at = CURRENT_TIMESTAMP\n    WHERE node_id = in_node_id;\nEND;\n$generated$\n    $fmt$, epoch_ms, _sch);\n\n    -- ----------------------------------------------------------------\n    -- 4. Regenerate RanjId function\n    -- ----------------------------------------------------------------\n    EXECUTE format($fmt$\nCREATE OR REPLACE FUNCTION generate_ranjids(\n    in_node_id INTEGER,\n    requested_count INTEGER,\n    allow_spanning BOOLEAN DEFAULT true\n)\nRETURNS TABLE(id UUID)\nLANGUAGE plpgsql\nSET search_path = %%8$I, pg_catalog\nAS $generated$\nDECLARE\n    -- Epoch and precision baked in by heer_configure()\n    epoch_ticks CONSTANT NUMERIC(30,0) := %%1$s;\n    -- epoch_offset is in microseconds; converted to ticks below\n    epoch_offset_us CONSTANT NUMERIC(30,0) := %%2$s;\n    precision_bits CONSTANT INTEGER := %%3$s;\n    -- Rollback thresholds baked in (scaled from microseconds to ticks)\n    logical_threshold CONSTANT NUMERIC(30,0) := %%4$s;\n    rollback_threshold CONSTANT NUMERIC(30,0) := %%5$s;\n    now_ticks NUMERIC(30,0);\n    last_time NUMERIC(30,0);\n    last_seq INTEGER;\n    current_tick NUMERIC(30,0);\n    next_seq INTEGER;\n    remaining INTEGER;\n    available_this_tick INTEGER;\n    emit_count INTEGER;\n    last_emitted_time NUMERIC(30,0);\n    last_emitted_seq INTEGER;\n    rollback_ticks NUMERIC(30,0);\n\n    ts_high BIGINT;\n    ts_mid BIGINT;\n    ts_low BIGINT;\n    hi BIGINT;\n    lo_base BIGINT;\nBEGIN\n    IF requested_count IS NULL OR requested_count <= 0 THEN\n        RAISE EXCEPTION \'requested_count must be greater than zero\';\n    END IF;\n\n    IF in_node_id IS NULL OR in_node_id < 0 OR in_node_id > 32767 THEN\n        RAISE EXCEPTION \'node_id %%%% is out of range for RanjId (0..32767)\', in_node_id;\n    END IF;\n\n    IF NOT EXISTS (\n        SELECT 1 FROM heer_nodes WHERE node_id = in_node_id AND is_active = true\n    ) THEN\n        RAISE EXCEPTION \'node_id %%%% is not registered as an active Heer node\', in_node_id;\n    END IF;\n\n    INSERT INTO heer_ranj_node_state (node_id)\n    VALUES (in_node_id)\n    ON CONFLICT (node_id) DO NOTHING;\n\n    SELECT s.last_id_time, s.last_sequence\n    INTO last_time, last_seq\n    FROM heer_ranj_node_state AS s\n    WHERE s.node_id = in_node_id\n    FOR UPDATE;\n\n    -- Calculate current time AFTER acquiring the lock to avoid false clock rollback\n    -- under concurrency (another thread may have advanced last_id_time while we waited)\n    -- epoch_offset is stored in microseconds; convert to ticks: ticks = us * multiplier / 1000000\n    -- current_tick = (now - epoch_ticks) + FLOOR(epoch_offset_us * multiplier / 1000000)\n    now_ticks := FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * %%6$s)::NUMERIC(30,0)\n                 - epoch_ticks\n                 + FLOOR(epoch_offset_us * %%7$s / 1000000)::NUMERIC(30,0);\n\n    rollback_ticks := last_time - now_ticks;\n    IF rollback_ticks > 0 THEN\n        IF rollback_ticks < logical_threshold THEN\n            RAISE EXCEPTION \'logical future drift for ranj node %%%% (%%%% ticks)\', in_node_id, rollback_ticks\n                USING ERRCODE = \'50021\';\n        ELSIF rollback_ticks < rollback_threshold THEN\n            RAISE EXCEPTION \'clock rollback detected for ranj node %%%% (%%%% ticks)\', in_node_id, rollback_ticks\n                USING ERRCODE = \'50020\';\n        ELSE\n            RAISE EXCEPTION \'hard clock rollback detected for ranj node %%%% (%%%% ticks)\', in_node_id, rollback_ticks\n                USING ERRCODE = \'50022\';\n        END IF;\n    END IF;\n\n    current_tick := GREATEST(now_ticks, last_time);\n    next_seq := CASE\n        WHEN current_tick = last_time THEN last_seq + 1\n        ELSE 0\n    END;\n\n    available_this_tick := 65536 - next_seq;\n    IF NOT allow_spanning AND requested_count > available_this_tick THEN\n        RAISE EXCEPTION\n            \'requested %%%% IDs but only %%%% remain in tick %%%% for ranj node %%%%\',\n            requested_count,\n            available_this_tick,\n            current_tick,\n            in_node_id;\n    END IF;\n\n    remaining := requested_count;\n\n    WHILE remaining > 0 LOOP\n        available_this_tick := 65536 - next_seq;\n        emit_count := LEAST(remaining, available_this_tick);\n\n        IF current_tick > (2::NUMERIC ^ 89) - 1 THEN\n            RAISE EXCEPTION \'RanjId timestamp %%%% exceeds 89-bit range (2^89 - 1)\', current_tick\n                USING ERRCODE = \'50030\';\n        END IF;\n\n        -- Decompose the 89-bit NUMERIC timestamp using division/modulo\n        -- so we never truncate at BIGINT 2^63 limit.\n        ts_high := (floor(current_tick / (2::NUMERIC ^ 41)) %%%% (2::NUMERIC ^ 48))::BIGINT;\n        ts_mid  := (floor(current_tick / (2::NUMERIC ^ 29)) %%%% (2::NUMERIC ^ 12))::BIGINT;\n        ts_low  := (current_tick %%%% (2::NUMERIC ^ 29))::BIGINT;\n\n        hi := (ts_high << 16)\n            | (8::BIGINT << 12)\n            | ts_mid;\n\n        -- lo layout (64 bits):\n        --   bit 63:    variant bit 1 (set by 0x8000000000000000)\n        --   bit 62:    variant bit 0 (0, already handled)\n        --   bits 60-61: precision (2 bits)\n        --   bits 31-59: ts_low (29 bits)\n        --   bits 16-30: node_id (15 bits)\n        --   bits 0-15:  sequence (16 bits)\n        lo_base := (-9223372036854775808)::BIGINT\n                 | (precision_bits::BIGINT << 60)\n                 | (ts_low << 31)\n                 | (in_node_id::BIGINT << 16);\n\n        RETURN QUERY\n        SELECT (\n            lpad(to_hex(hi), 16, \'0\')\n            || lpad(to_hex(lo_base | seq.s::BIGINT), 16, \'0\')\n        )::UUID AS id\n        FROM generate_series(next_seq, next_seq + emit_count - 1) AS seq(s);\n\n        last_emitted_time := current_tick;\n        last_emitted_seq := next_seq + emit_count - 1;\n        remaining := remaining - emit_count;\n        current_tick := current_tick + 1;\n        next_seq := 0;\n    END LOOP;\n\n    UPDATE heer_ranj_node_state\n    SET last_id_time = last_emitted_time,\n        last_sequence = last_emitted_seq,\n        updated_at = CURRENT_TIMESTAMP\n    WHERE node_id = in_node_id;\nEND;\n$generated$\n    $fmt$, epoch_ticks, cfg_offset, precision_bits,\n           logical_threshold::TEXT, rollback_threshold::TEXT,\n           multiplier::TEXT, multiplier::TEXT, _sch);\n\n    -- ----------------------------------------------------------------\n    -- 5. Conditionally reset node state\n    -- Node state is only reset when force_reset_state = true.\n    -- Pass true when intentionally changing epoch or precision.\n    -- The default (false) makes re-running heer_configure() safe after deploys.\n    -- ----------------------------------------------------------------\n    IF force_reset_state THEN\n        UPDATE heer_node_state      SET last_id_time = 0, last_sequence = 0, updated_at = CURRENT_TIMESTAMP;\n        UPDATE heer_ranj_node_state SET last_id_time = 0, last_sequence = 0, updated_at = CURRENT_TIMESTAMP;\n    END IF;\n\n    -- ----------------------------------------------------------------\n    -- 6. Smoke test\n    -- ----------------------------------------------------------------\n    smoke_heerid := generate_id(1);\n    smoke_ranjid := generate_ranjid(1);\n\n    RAISE NOTICE \'heer_configure() succeeded. smoke HeerId=%%, smoke RanjId=%%\', smoke_heerid, smoke_ranjid;\nEND;\n$func$;\n$sql$, _sch);\nEND;\n$install$;\n\n-- Only superusers / explicit GRANT can run this\nREVOKE EXECUTE ON FUNCTION heer_configure(BOOLEAN) FROM PUBLIC;\n";