// Auto-generated by BoltFFI. Do not edit.
package {{ module.package_name }};
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.IntFunction;
{%- if module.uses_completable_future() %}
import java.util.concurrent.CompletableFuture;
{%- endif %}
{%- if module.has_async() || module.has_async_callbacks() || module.has_streams() %}
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
{%- endif %}
{%- if module.has_async() || module.has_streams() %}
import java.util.concurrent.atomic.AtomicReference;
{%- endif %}
final class WireReader {
private final byte[] data;
private int pos;
private static final int MIN_LONG_VEC_INPUT_CAPACITY = 16;
private static final ThreadLocal<ArrayList<ByteBuffer>> LONG_VEC_INPUT_SLOTS =
ThreadLocal.withInitial(ArrayList::new);
WireReader(byte[] data) {
this.data = data;
this.pos = 0;
}
boolean readBool() {
return data[pos++] != 0;
}
byte readI8() {
return data[pos++];
}
short readI16() {
short v = (short) ((data[pos] & 0xFF) | ((data[pos + 1] & 0xFF) << 8));
pos += 2;
return v;
}
int readI32() {
int v = (data[pos] & 0xFF)
| ((data[pos + 1] & 0xFF) << 8)
| ((data[pos + 2] & 0xFF) << 16)
| ((data[pos + 3] & 0xFF) << 24);
pos += 4;
return v;
}
long readI64() {
long v = (data[pos] & 0xFFL)
| ((data[pos + 1] & 0xFFL) << 8)
| ((data[pos + 2] & 0xFFL) << 16)
| ((data[pos + 3] & 0xFFL) << 24)
| ((data[pos + 4] & 0xFFL) << 32)
| ((data[pos + 5] & 0xFFL) << 40)
| ((data[pos + 6] & 0xFFL) << 48)
| ((data[pos + 7] & 0xFFL) << 56);
pos += 8;
return v;
}
float readF32() {
return Float.intBitsToFloat(readI32());
}
double readF64() {
return Double.longBitsToDouble(readI64());
}
private void requireAvailable(int len, String kind) {
if (len > data.length - pos) {
throw new RuntimeException("corrupt wire: truncated " + kind);
}
}
private int checkedByteCount(int count, int elementSize, String kind) {
if (count < 0) throw new RuntimeException("corrupt wire: negative " + kind + " length");
long byteCountLong = (long) count * (long) elementSize;
if (byteCountLong > Integer.MAX_VALUE) {
throw new RuntimeException("corrupt wire: " + kind + " payload too large");
}
int byteCount = (int) byteCountLong;
requireAvailable(byteCount, kind + " payload");
return byteCount;
}
String readString() {
int len = readI32();
if (len == 0) return "";
if (len < 0) throw new RuntimeException("corrupt wire: negative string length");
requireAvailable(len, "string payload");
String v = new String(data, pos, len, StandardCharsets.UTF_8);
pos += len;
return v;
}
byte[] readBytes() {
int len = readI32();
if (len == 0) return new byte[0];
if (len < 0) throw new RuntimeException("corrupt wire: negative bytes length");
requireAvailable(len, "bytes payload");
byte[] v = new byte[len];
System.arraycopy(data, pos, v, 0, len);
pos += len;
return v;
}
boolean[] readBooleanArray() {
int count = readI32();
if (count == 0) return new boolean[0];
int byteCount = checkedByteCount(count, 1, "boolean vec");
boolean[] result = new boolean[count];
int end = pos + byteCount;
for (int index = 0, dataIndex = pos; dataIndex < end; index++, dataIndex++) {
result[index] = data[dataIndex] != 0;
}
pos = end;
return result;
}
byte[] readByteArray() {
int count = readI32();
if (count == 0) return new byte[0];
int byteCount = checkedByteCount(count, 1, "byte vec");
byte[] result = new byte[count];
System.arraycopy(data, pos, result, 0, byteCount);
pos += byteCount;
return result;
}
short[] readShortArray() {
int count = readI32();
if (count == 0) return new short[0];
int byteCount = checkedByteCount(count, 2, "short vec");
short[] result = new short[count];
ByteBuffer.wrap(data, pos, byteCount).order(ByteOrder.LITTLE_ENDIAN).asShortBuffer().get(result);
pos += byteCount;
return result;
}
int[] readIntArray() {
int count = readI32();
if (count == 0) return new int[0];
int byteCount = checkedByteCount(count, 4, "int vec");
int[] result = new int[count];
ByteBuffer.wrap(data, pos, byteCount).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().get(result);
pos += byteCount;
return result;
}
long[] readLongArray() {
int count = readI32();
if (count == 0) return new long[0];
int byteCount = checkedByteCount(count, 8, "long vec");
long[] result = new long[count];
ByteBuffer.wrap(data, pos, byteCount).order(ByteOrder.LITTLE_ENDIAN).asLongBuffer().get(result);
pos += byteCount;
return result;
}
float[] readFloatArray() {
int count = readI32();
if (count == 0) return new float[0];
int byteCount = checkedByteCount(count, 4, "float vec");
float[] result = new float[count];
ByteBuffer.wrap(data, pos, byteCount).order(ByteOrder.LITTLE_ENDIAN).asFloatBuffer().get(result);
pos += byteCount;
return result;
}
double[] readDoubleArray() {
int count = readI32();
if (count == 0) return new double[0];
int byteCount = checkedByteCount(count, 8, "double vec");
double[] result = new double[count];
ByteBuffer.wrap(data, pos, byteCount).order(ByteOrder.LITTLE_ENDIAN).asDoubleBuffer().get(result);
pos += byteCount;
return result;
}
ByteBuffer readBlittableBuffer(int count, int elementSize) {
if (count < 0) throw new RuntimeException("corrupt wire: negative vec length");
long byteCountLong = (long) count * (long) elementSize;
if (byteCountLong > Integer.MAX_VALUE) {
throw new RuntimeException("corrupt wire: vec payload too large");
}
int byteCount = (int) byteCountLong;
requireAvailable(byteCount, "blittable vec payload");
ByteBuffer view = ByteBuffer
.wrap(data, pos, byteCount)
.slice()
.order(ByteOrder.LITTLE_ENDIAN);
pos += byteCount;
return view;
}
static boolean[] booleanArrayFromRawBuffer(byte[] buf) {
if (buf == null || buf.length == 0) return new boolean[0];
boolean[] result = new boolean[buf.length];
for (int i = 0; i < buf.length; i++) result[i] = buf[i] != 0;
return result;
}
static short[] shortArrayFromRawBuffer(byte[] buf) {
if (buf == null || buf.length == 0) return new short[0];
ByteBuffer bb = ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN);
short[] result = new short[buf.length / 2];
bb.asShortBuffer().get(result);
return result;
}
static int[] intArrayFromRawBuffer(byte[] buf) {
if (buf == null || buf.length == 0) return new int[0];
ByteBuffer bb = ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN);
int[] result = new int[buf.length / 4];
bb.asIntBuffer().get(result);
return result;
}
static long[] longArrayFromRawBuffer(byte[] buf) {
if (buf == null || buf.length == 0) return new long[0];
ByteBuffer bb = ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN);
long[] result = new long[buf.length / 8];
bb.asLongBuffer().get(result);
return result;
}
static float[] floatArrayFromRawBuffer(byte[] buf) {
if (buf == null || buf.length == 0) return new float[0];
ByteBuffer bb = ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN);
float[] result = new float[buf.length / 4];
bb.asFloatBuffer().get(result);
return result;
}
static double[] doubleArrayFromRawBuffer(byte[] buf) {
if (buf == null || buf.length == 0) return new double[0];
ByteBuffer bb = ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN);
double[] result = new double[buf.length / 8];
bb.asDoubleBuffer().get(result);
return result;
}
@FunctionalInterface
interface ByteMapper<T> {
T apply(byte value);
}
@FunctionalInterface
interface ShortMapper<T> {
T apply(short value);
}
@FunctionalInterface
interface IntMapper<T> {
T apply(int value);
}
@FunctionalInterface
interface LongMapper<T> {
T apply(long value);
}
@FunctionalInterface
interface ByteExtractor<T> {
byte apply(T value);
}
@FunctionalInterface
interface ShortExtractor<T> {
short apply(T value);
}
@FunctionalInterface
interface IntExtractor<T> {
int apply(T value);
}
@FunctionalInterface
interface LongExtractor<T> {
long apply(T value);
}
static <T> byte[] toByteArray(List<T> values, ByteExtractor<T> extractor) {
if (values.isEmpty()) return new byte[0];
byte[] result = new byte[values.size()];
int index = 0;
for (T value : values) result[index++] = extractor.apply(value);
return result;
}
static <T> short[] toShortArray(List<T> values, ShortExtractor<T> extractor) {
if (values.isEmpty()) return new short[0];
short[] result = new short[values.size()];
int index = 0;
for (T value : values) result[index++] = extractor.apply(value);
return result;
}
static <T> int[] toIntArray(List<T> values, IntExtractor<T> extractor) {
if (values.isEmpty()) return new int[0];
int[] result = new int[values.size()];
int index = 0;
for (T value : values) result[index++] = extractor.apply(value);
return result;
}
static <T> long[] toLongArray(List<T> values, LongExtractor<T> extractor) {
if (values.isEmpty()) return new long[0];
long[] result = new long[values.size()];
int index = 0;
for (T value : values) result[index++] = extractor.apply(value);
return result;
}
static ByteBuffer encodeLongVecInput(long[] values, int slot) {
if (slot < 0) {
throw new RuntimeException("invalid long vec slot index");
}
int count = values == null ? 0 : values.length;
long requiredCapacityLong = 4L + ((long) count * 8L);
if (requiredCapacityLong > Integer.MAX_VALUE) {
throw new RuntimeException("input too large: long vec payload exceeds JVM buffer limits");
}
int requiredCapacity = (int) requiredCapacityLong;
ArrayList<ByteBuffer> buffers = LONG_VEC_INPUT_SLOTS.get();
while (buffers.size() <= slot) {
buffers.add(ByteBuffer.allocateDirect(MIN_LONG_VEC_INPUT_CAPACITY).order(ByteOrder.LITTLE_ENDIAN));
}
ByteBuffer buffer = buffers.get(slot);
if (buffer.capacity() < requiredCapacity) {
int grownCapacity = Math.max(buffer.capacity() * 2, requiredCapacity);
buffer = ByteBuffer.allocateDirect(grownCapacity).order(ByteOrder.LITTLE_ENDIAN);
buffers.set(slot, buffer);
} else {
buffer.clear();
buffer.order(ByteOrder.LITTLE_ENDIAN);
}
buffer.putInt(count);
if (values != null) {
for (long value : values) {
buffer.putLong(value);
}
}
buffer.flip();
return buffer;
}
static <T> List<T> mapByteArray(byte[] values, ByteMapper<T> mapper) {
if (values == null || values.length == 0) return Collections.emptyList();
List<T> result = new ArrayList<>(values.length);
for (byte value : values) result.add(mapper.apply(value));
return result;
}
static <T> List<T> mapShortArray(short[] values, ShortMapper<T> mapper) {
if (values.length == 0) return Collections.emptyList();
List<T> result = new ArrayList<>(values.length);
for (short value : values) result.add(mapper.apply(value));
return result;
}
static <T> List<T> mapShortRawBuffer(byte[] values, ShortMapper<T> mapper) {
if (values == null || values.length == 0) return Collections.emptyList();
if ((values.length & 1) != 0) {
throw new RuntimeException("corrupt buffer: expected short-aligned enum vec payload");
}
ByteBuffer buffer = ByteBuffer.wrap(values).order(ByteOrder.LITTLE_ENDIAN);
int count = values.length / 2;
List<T> result = new ArrayList<>(count);
for (int index = 0; index < count; index++) result.add(mapper.apply(buffer.getShort()));
return result;
}
static <T> List<T> mapIntArray(int[] values, IntMapper<T> mapper) {
if (values.length == 0) return Collections.emptyList();
List<T> result = new ArrayList<>(values.length);
for (int value : values) result.add(mapper.apply(value));
return result;
}
static <T> List<T> mapIntRawBuffer(byte[] values, IntMapper<T> mapper) {
if (values == null || values.length == 0) return Collections.emptyList();
if ((values.length & 3) != 0) {
throw new RuntimeException("corrupt buffer: expected int-aligned enum vec payload");
}
ByteBuffer buffer = ByteBuffer.wrap(values).order(ByteOrder.LITTLE_ENDIAN);
int count = values.length / 4;
List<T> result = new ArrayList<>(count);
for (int index = 0; index < count; index++) result.add(mapper.apply(buffer.getInt()));
return result;
}
static <T> List<T> mapLongArray(long[] values, LongMapper<T> mapper) {
if (values.length == 0) return Collections.emptyList();
List<T> result = new ArrayList<>(values.length);
for (long value : values) result.add(mapper.apply(value));
return result;
}
static <T> List<T> mapLongRawBuffer(byte[] values, LongMapper<T> mapper) {
if (values == null || values.length == 0) return Collections.emptyList();
if ((values.length & 7) != 0) {
throw new RuntimeException("corrupt buffer: expected long-aligned enum vec payload");
}
ByteBuffer buffer = ByteBuffer.wrap(values).order(ByteOrder.LITTLE_ENDIAN);
int count = values.length / 8;
List<T> result = new ArrayList<>(count);
for (int index = 0; index < count; index++) result.add(mapper.apply(buffer.getLong()));
return result;
}
<T> List<T> readList(IntFunction<T> decoder) {
int count = readI32();
List<T> result = new ArrayList<>(count);
for (int i = 0; i < count; i++) result.add(decoder.apply(i));
return result;
}
static <T> List<T> readList(byte[] data, IntFunction<T> decoder) {
return new WireReader(data).readList(decoder);
}
static WireReader fromBuffer(ByteBuffer buffer) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new WireReader(bytes);
}
static <T> T decodeBuffer(ByteBuffer buffer, Function<WireReader, T> decoder) {
return decoder.apply(fromBuffer(buffer));
}
static String stringFromBuffer(ByteBuffer buffer) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}
static byte[] wireEncodeIntArray(int[] values) {
ByteBuffer buf = ByteBuffer.allocate(4 + values.length * 4).order(ByteOrder.LITTLE_ENDIAN);
buf.putInt(values.length);
for (int v : values) buf.putInt(v);
return buf.array();
}
static List<Boolean> readPackedBools(byte[] bytes) {
List<Boolean> result = new ArrayList<>(bytes.length);
for (byte b : bytes) result.add(b != 0);
return result;
}
static List<Byte> readPackedBytes(byte[] bytes) {
List<Byte> result = new ArrayList<>(bytes.length);
for (byte b : bytes) result.add(b);
return result;
}
static List<Short> readPackedShorts(byte[] bytes) {
java.nio.ShortBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()).asShortBuffer();
List<Short> result = new ArrayList<>(buf.remaining());
while (buf.hasRemaining()) result.add(buf.get());
return result;
}
static List<Integer> readPackedInts(byte[] bytes) {
java.nio.IntBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()).asIntBuffer();
List<Integer> result = new ArrayList<>(buf.remaining());
while (buf.hasRemaining()) result.add(buf.get());
return result;
}
static List<Long> readPackedLongs(byte[] bytes) {
java.nio.LongBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()).asLongBuffer();
List<Long> result = new ArrayList<>(buf.remaining());
while (buf.hasRemaining()) result.add(buf.get());
return result;
}
static List<Float> readPackedFloats(byte[] bytes) {
java.nio.FloatBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()).asFloatBuffer();
List<Float> result = new ArrayList<>(buf.remaining());
while (buf.hasRemaining()) result.add(buf.get());
return result;
}
static List<Double> readPackedDoubles(byte[] bytes) {
java.nio.DoubleBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()).asDoubleBuffer();
List<Double> result = new ArrayList<>(buf.remaining());
while (buf.hasRemaining()) result.add(buf.get());
return result;
}
}
{% if module.needs_wire_writer() %}
final class WireWriter implements AutoCloseable {
private static final int MIN_CAPACITY = 16;
private static final int MAX_POOLED_BUFFERS_PER_THREAD = 8;
private static final int MAX_POOLED_CAPACITY = 1 << 20;
private static final ThreadLocal<ArrayDeque<ByteBuffer>> POOL =
ThreadLocal.withInitial(ArrayDeque::new);
private ByteBuffer buf;
private boolean closed;
WireWriter(int capacity) {
this.buf = takeBuffer(capacity);
this.closed = false;
}
ByteBuffer toBuffer() {
ByteBuffer view = buf.duplicate().order(ByteOrder.LITTLE_ENDIAN);
view.flip();
return view.slice().order(ByteOrder.LITTLE_ENDIAN);
}
byte[] toByteArray() {
ByteBuffer view = toBuffer();
byte[] bytes = new byte[view.remaining()];
view.get(bytes);
return bytes;
}
private void ensureCapacity(int needed) {
if (buf.remaining() >= needed) return;
int next = Math.max(buf.capacity() * 2, buf.position() + needed);
ByteBuffer grown = allocateBuffer(next);
buf.flip();
grown.put(buf);
recycleBuffer(buf);
buf = grown;
}
void writeBool(boolean v) { ensureCapacity(1); buf.put((byte) (v ? 1 : 0)); }
void writeI8(byte v) { ensureCapacity(1); buf.put(v); }
void writeI16(short v) { ensureCapacity(2); buf.putShort(v); }
void writeI32(int v) { ensureCapacity(4); buf.putInt(v); }
void writeI64(long v) { ensureCapacity(8); buf.putLong(v); }
void writeF32(float v) { ensureCapacity(4); buf.putFloat(v); }
void writeF64(double v) { ensureCapacity(8); buf.putDouble(v); }
@FunctionalInterface
interface RawBytesWriter {
void write(ByteBuffer buffer, int baseOffset);
}
void writeRawBytes(int byteCount, RawBytesWriter writer) {
if (byteCount <= 0) return;
ensureCapacity(byteCount);
int baseOffset = buf.position();
writer.write(buf, baseOffset);
buf.position(baseOffset + byteCount);
}
void writeString(String v) {
byte[] utf8 = v.getBytes(StandardCharsets.UTF_8);
writeI32(utf8.length);
ensureCapacity(utf8.length);
buf.put(utf8);
}
void writeBytes(byte[] v) {
writeI32(v.length);
ensureCapacity(v.length);
buf.put(v);
}
void writeBooleanArray(boolean[] v) {
writeI32(v.length);
ensureCapacity(v.length);
for (boolean b : v) buf.put((byte) (b ? 1 : 0));
}
void writeByteArray(byte[] v) {
writeI32(v.length);
ensureCapacity(v.length);
buf.put(v);
}
void writeShortArray(short[] v) {
writeI32(v.length);
int byteCount = v.length * 2;
ensureCapacity(byteCount);
buf.asShortBuffer().put(v);
buf.position(buf.position() + byteCount);
}
void writeIntArray(int[] v) {
writeI32(v.length);
int byteCount = v.length * 4;
ensureCapacity(byteCount);
buf.asIntBuffer().put(v);
buf.position(buf.position() + byteCount);
}
void writeLongArray(long[] v) {
writeI32(v.length);
int byteCount = v.length * 8;
ensureCapacity(byteCount);
buf.asLongBuffer().put(v);
buf.position(buf.position() + byteCount);
}
void writeFloatArray(float[] v) {
writeI32(v.length);
int byteCount = v.length * 4;
ensureCapacity(byteCount);
buf.asFloatBuffer().put(v);
buf.position(buf.position() + byteCount);
}
void writeDoubleArray(double[] v) {
writeI32(v.length);
int byteCount = v.length * 8;
ensureCapacity(byteCount);
buf.asDoubleBuffer().put(v);
buf.position(buf.position() + byteCount);
}
static int stringWireSize(String v) {
return 4 + (v.length() * 3);
}
static int vecLength(List<?> v) {
return v.size();
}
static int vecLength(boolean[] v) {
return v.length;
}
static int vecLength(byte[] v) {
return v.length;
}
static int vecLength(short[] v) {
return v.length;
}
static int vecLength(int[] v) {
return v.length;
}
static int vecLength(long[] v) {
return v.length;
}
static int vecLength(float[] v) {
return v.length;
}
static int vecLength(double[] v) {
return v.length;
}
static <T> int listWireSize(List<T> list, java.util.function.ToIntFunction<T> sizer) {
int total = 4;
for (T item : list) total += sizer.applyAsInt(item);
return total;
}
static <T> boolean listEquals(
List<T> left,
List<T> right,
java.util.function.BiPredicate<T, T> itemEquals
) {
if (left == right) return true;
if (left == null || right == null) return false;
if (left.size() != right.size()) return false;
for (int index = 0; index < left.size(); index++) {
if (!itemEquals.test(left.get(index), right.get(index))) return false;
}
return true;
}
static <T> int listHash(List<T> values, java.util.function.ToIntFunction<T> itemHash) {
if (values == null) return 0;
int result = 1;
for (T value : values) {
result = 31 * result + itemHash.applyAsInt(value);
}
return result;
}
@Override
public void close() {
if (closed) return;
closed = true;
recycleBuffer(buf);
buf = null;
}
private static ByteBuffer takeBuffer(int requestedCapacity) {
int requiredCapacity = Math.max(requestedCapacity, MIN_CAPACITY);
ArrayDeque<ByteBuffer> pool = POOL.get();
int remaining = pool.size();
while (remaining-- > 0) {
ByteBuffer candidate = pool.pollFirst();
if (candidate.capacity() >= requiredCapacity) {
candidate.clear();
candidate.order(ByteOrder.LITTLE_ENDIAN);
return candidate;
}
pool.offerLast(candidate);
}
return allocateBuffer(requiredCapacity);
}
private static ByteBuffer allocateBuffer(int capacity) {
return ByteBuffer.allocateDirect(capacity).order(ByteOrder.LITTLE_ENDIAN);
}
private static void recycleBuffer(ByteBuffer buffer) {
if (buffer == null) return;
if (buffer.capacity() > MAX_POOLED_CAPACITY) return;
ArrayDeque<ByteBuffer> pool = POOL.get();
if (pool.size() >= MAX_POOLED_BUFFERS_PER_THREAD) return;
buffer.clear();
buffer.order(ByteOrder.LITTLE_ENDIAN);
pool.offerFirst(buffer);
}
}
{% endif %}
final class BoltFFIResult<Ok, Err> {
private final Ok okValue;
private final Err errValue;
private final boolean ok;
private BoltFFIResult(Ok okValue, Err errValue, boolean ok) {
this.okValue = okValue;
this.errValue = errValue;
this.ok = ok;
}
static <Ok, Err> BoltFFIResult<Ok, Err> ok(Ok value) {
return new BoltFFIResult<>(value, null, true);
}
static <Ok, Err> BoltFFIResult<Ok, Err> err(Err value) {
return new BoltFFIResult<>(null, value, false);
}
boolean isOk() {
return ok;
}
Ok okValue() {
return okValue;
}
Err errValue() {
return errValue;
}
}
{%- if module.has_async() || module.has_streams() %}
final class BoltFFIContinuationSignal {
enum DeliveryState { WAITING, PENDING_DELIVERY, READY_DELIVERY, CANCELLED }
enum CancellationClaim { OWNED, DEFERRED, REJECTED }
private final AtomicReference<DeliveryState> deliveryState;
private final CompletableFuture<Byte> future;
private final Runnable onReady;
BoltFFIContinuationSignal(CompletableFuture<Byte> future, Runnable onReady) {
this.deliveryState = new AtomicReference<>(DeliveryState.WAITING);
this.future = future;
this.onReady = onReady;
}
boolean beginCompletion(byte pollResult) {
DeliveryState delivery = pollResult == 0 ? DeliveryState.READY_DELIVERY : DeliveryState.PENDING_DELIVERY;
if (!deliveryState.compareAndSet(DeliveryState.WAITING, delivery)) return false;
if (delivery == DeliveryState.READY_DELIVERY && onReady != null) {
onReady.run();
}
return true;
}
CancellationClaim claimForCancellation() {
while (true) {
DeliveryState currentState = deliveryState.get();
if (currentState == DeliveryState.READY_DELIVERY) {
return CancellationClaim.REJECTED;
}
if (currentState == DeliveryState.PENDING_DELIVERY) {
return CancellationClaim.DEFERRED;
}
if (currentState == DeliveryState.CANCELLED) {
return CancellationClaim.OWNED;
}
if (deliveryState.compareAndSet(DeliveryState.WAITING, DeliveryState.CANCELLED)) {
return CancellationClaim.OWNED;
}
}
}
boolean isReadyDeliveryStarted() {
return deliveryState.get() == DeliveryState.READY_DELIVERY;
}
void finishCompletion(byte pollResult) {
future.complete(pollResult);
}
}
final class BoltFFIContinuationMap {
private static final AtomicLong NEXT_HANDLE = new AtomicLong(1);
private static final ConcurrentHashMap<Long, BoltFFIContinuationSignal> MAP = new ConcurrentHashMap<>();
static long insert(CompletableFuture<Byte> future) {
return insert(new BoltFFIContinuationSignal(future, null));
}
static long insert(BoltFFIContinuationSignal signal) {
long handle = NEXT_HANDLE.getAndIncrement();
MAP.put(handle, signal);
return handle;
}
static void complete(long handle, byte pollResult) {
BoltFFIContinuationSignal signal = MAP.get(handle);
if (signal != null && signal.beginCompletion(pollResult)) {
signal.finishCompletion(pollResult);
MAP.remove(handle, signal);
}
}
static void remove(long handle, BoltFFIContinuationSignal signal) {
MAP.remove(handle, signal);
}
}
{%- if module.has_async_callbacks() %}
final class BoltFFICallbackCompletion {
private final java.util.function.Consumer<Object> onSuccess;
private final java.util.function.Consumer<Throwable> onFailure;
BoltFFICallbackCompletion(
java.util.function.Consumer<Object> onSuccess,
java.util.function.Consumer<Throwable> onFailure
) {
this.onSuccess = onSuccess;
this.onFailure = onFailure;
}
void completeSuccess(Object value) {
onSuccess.accept(value);
}
void completeFailure(Throwable error) {
onFailure.accept(error);
}
}
final class BoltFFICallbackFutureMap {
private static final AtomicLong NEXT_HANDLE = new AtomicLong(1);
private static final ConcurrentHashMap<Long, BoltFFICallbackCompletion> MAP = new ConcurrentHashMap<>();
static long insert(
java.util.function.Consumer<Object> onSuccess,
java.util.function.Consumer<Throwable> onFailure
) {
long handle = NEXT_HANDLE.getAndIncrement();
MAP.put(handle, new BoltFFICallbackCompletion(onSuccess, onFailure));
return handle;
}
static void completeSuccess(long handle, Object value) {
BoltFFICallbackCompletion completion = MAP.remove(handle);
if (completion != null) {
completion.completeSuccess(value);
}
}
static void completeFailure(long handle, Throwable error) {
BoltFFICallbackCompletion completion = MAP.remove(handle);
if (completion != null) {
completion.completeFailure(error);
}
}
}
{%- endif %}
{%- if module.async_mode.is_virtual_thread() %}
final class BoltFFIAsync {
private static final byte POLL_READY = 0;
interface FutureCreator {
long create();
}
interface FuturePoll {
void poll(long future, long contHandle);
}
interface FutureComplete<T> {
T complete(long future);
}
interface FutureLifecycle {
void apply(long future);
}
static <T> T callAsync(
FutureCreator createFuture,
FuturePoll poll,
FutureComplete<T> complete,
FutureLifecycle free,
FutureLifecycle cancel
) {
return callAsyncInternal(createFuture, poll, complete, free, cancel);
}
static void callAsyncVoid(
FutureCreator createFuture,
FuturePoll poll,
FutureLifecycle completeVoid,
FutureLifecycle free,
FutureLifecycle cancel
) {
callAsyncInternal(
createFuture,
poll,
rustFuture -> {
completeVoid.apply(rustFuture);
return null;
},
free,
cancel
);
}
private static <T> T callAsyncInternal(
FutureCreator createFuture,
FuturePoll poll,
FutureComplete<T> complete,
FutureLifecycle free,
FutureLifecycle cancel
) {
long rustFuture = createFuture.create();
try {
byte pollResult;
do {
CompletableFuture<Byte> signal = new CompletableFuture<>();
long contHandle = BoltFFIContinuationMap.insert(signal);
poll.poll(rustFuture, contHandle);
try {
pollResult = signal.get();
} catch (Exception e) {
cancel.apply(rustFuture);
throw new RuntimeException("async poll interrupted", e);
}
} while (pollResult != POLL_READY);
return complete.complete(rustFuture);
} finally {
free.apply(rustFuture);
}
}
}
{%- else %}
final class BoltFFIAsync {
private static final byte POLL_READY = 0;
interface FutureCreator {
long create();
}
interface FuturePoll {
void poll(long future, long contHandle);
}
interface FutureComplete<T> {
T complete(long future);
}
interface FutureLifecycle {
void apply(long future);
}
private enum FutureState { ACTIVE, POLLING, WAITING, CANCEL_REQUESTED, READY, OWNED }
private enum CancelDisposition { REJECTED, DEFERRED, OWNED }
private enum PendingJoinDisposition { STOPPED, CONTINUE, CANCELLED }
private static final class ActiveContinuation {
private final long handle;
private final BoltFFIContinuationSignal signal;
private ActiveContinuation(long handle, BoltFFIContinuationSignal signal) {
this.handle = handle;
this.signal = signal;
}
}
private static final class ManagedFuture<T> extends CompletableFuture<T> {
private final long rustFuture;
private final AtomicReference<FutureState> state;
private final AtomicReference<ActiveContinuation> activeContinuation;
private final FutureLifecycle cancel;
private final FutureLifecycle free;
private ManagedFuture(
long rustFuture,
AtomicReference<FutureState> state,
AtomicReference<ActiveContinuation> activeContinuation,
FutureLifecycle cancel,
FutureLifecycle free
) {
this.rustFuture = rustFuture;
this.state = state;
this.activeContinuation = activeContinuation;
this.cancel = cancel;
this.free = free;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
CancelDisposition cancelDisposition = requestCancellation(state, activeContinuation);
if (cancelDisposition == CancelDisposition.REJECTED) return false;
if (cancelDisposition == CancelDisposition.DEFERRED) {
return super.cancel(mayInterruptIfRunning);
}
cancelAndFree(rustFuture, cancel, free);
return super.cancel(mayInterruptIfRunning);
}
private void finishCancelled() {
cancelAndFree(rustFuture, cancel, free);
super.cancel(false);
}
}
private static void cancelAndFree(long rustFuture, FutureLifecycle cancel, FutureLifecycle free) {
try { cancel.apply(rustFuture); } catch (Exception ignored) {}
try { free.apply(rustFuture); } catch (Exception ignored) {}
}
private static void releaseContinuation(
AtomicReference<ActiveContinuation> activeContinuation,
ActiveContinuation activePoll
) {
activeContinuation.compareAndSet(activePoll, null);
}
private static <T> void finishCancelled(
ManagedFuture<T> result,
AtomicReference<ActiveContinuation> activeContinuation,
ActiveContinuation activePoll
) {
releaseContinuation(activeContinuation, activePoll);
result.finishCancelled();
}
private static <T> boolean finishDeferredCancellation(
AtomicReference<FutureState> state,
ManagedFuture<T> result
) {
if (!claimDeferredCancellation(state)) return false;
result.finishCancelled();
return true;
}
private static <T> boolean finishDeferredCancellation(
AtomicReference<FutureState> state,
ManagedFuture<T> result,
AtomicReference<ActiveContinuation> activeContinuation,
ActiveContinuation activePoll
) {
if (!claimDeferredCancellation(state)) return false;
finishCancelled(result, activeContinuation, activePoll);
return true;
}
private static <T> void finishPollingFailure(
long rustFuture,
FutureLifecycle free,
FutureLifecycle cancel,
ManagedFuture<T> result,
AtomicReference<FutureState> state,
AtomicReference<ActiveContinuation> activeContinuation,
ActiveContinuation activePoll,
Exception error
) {
releaseContinuation(activeContinuation, activePoll);
state.set(FutureState.OWNED);
cancelAndFree(rustFuture, cancel, free);
result.completeExceptionally(error);
}
private static <T> void finishJoinedReady(
long rustFuture,
FutureComplete<T> complete,
FutureLifecycle free,
ManagedFuture<T> result,
AtomicReference<FutureState> state,
AtomicReference<ActiveContinuation> activeContinuation,
ActiveContinuation activePoll
) {
if (claimReady(state)) {
completeReadyResult(rustFuture, complete, free, result);
releaseContinuation(activeContinuation, activePoll);
return;
}
finishDeferredCancellation(state, result, activeContinuation, activePoll);
releaseContinuation(activeContinuation, activePoll);
}
private static CancelDisposition requestCancellation(
AtomicReference<FutureState> state,
AtomicReference<ActiveContinuation> activeContinuation
) {
while (true) {
FutureState currentState = state.get();
if (currentState == FutureState.OWNED || currentState == FutureState.READY) {
return CancelDisposition.REJECTED;
}
if (currentState == FutureState.CANCEL_REQUESTED) {
return CancelDisposition.DEFERRED;
}
if (currentState == FutureState.ACTIVE) {
if (state.compareAndSet(currentState, FutureState.OWNED)) {
return CancelDisposition.OWNED;
}
continue;
}
if (currentState == FutureState.WAITING) {
ActiveContinuation continuation = activeContinuation.get();
if (continuation == null) continue;
BoltFFIContinuationSignal.CancellationClaim continuationClaim =
continuation.signal.claimForCancellation();
if (continuationClaim == BoltFFIContinuationSignal.CancellationClaim.OWNED) {
if (!state.compareAndSet(FutureState.WAITING, FutureState.OWNED)) continue;
releaseContinuation(activeContinuation, continuation);
BoltFFIContinuationMap.remove(continuation.handle, continuation.signal);
return CancelDisposition.OWNED;
}
if (continuationClaim == BoltFFIContinuationSignal.CancellationClaim.REJECTED) {
return CancelDisposition.REJECTED;
}
if (!state.compareAndSet(FutureState.WAITING, FutureState.CANCEL_REQUESTED)) continue;
return CancelDisposition.DEFERRED;
}
if (currentState == FutureState.POLLING) {
ActiveContinuation continuation = activeContinuation.get();
if (continuation != null && continuation.signal.isReadyDeliveryStarted()) {
return CancelDisposition.REJECTED;
}
if (state.compareAndSet(FutureState.POLLING, FutureState.CANCEL_REQUESTED)) {
ActiveContinuation updatedContinuation = activeContinuation.get();
if (updatedContinuation != null && updatedContinuation.signal.isReadyDeliveryStarted()) {
if (state.compareAndSet(FutureState.CANCEL_REQUESTED, FutureState.READY)) {
return CancelDisposition.REJECTED;
}
continue;
}
return CancelDisposition.DEFERRED;
}
}
}
}
private static void markReady(AtomicReference<FutureState> state) {
while (true) {
FutureState currentState = state.get();
if (currentState == FutureState.CANCEL_REQUESTED
|| currentState == FutureState.READY
|| currentState == FutureState.OWNED) return;
if (state.compareAndSet(currentState, FutureState.READY)) return;
}
}
private static boolean claimReady(AtomicReference<FutureState> state) {
return state.compareAndSet(FutureState.READY, FutureState.OWNED);
}
private static boolean claimDeferredCancellation(AtomicReference<FutureState> state) {
return state.compareAndSet(FutureState.CANCEL_REQUESTED, FutureState.OWNED);
}
private static boolean rearmPolling(AtomicReference<FutureState> state) {
while (true) {
FutureState currentState = state.get();
if (currentState == FutureState.WAITING || currentState == FutureState.POLLING) {
if (state.compareAndSet(currentState, FutureState.ACTIVE)) return true;
continue;
}
return false;
}
}
private static boolean enterWaiting(AtomicReference<FutureState> state) {
return state.compareAndSet(FutureState.POLLING, FutureState.WAITING);
}
private static PendingJoinDisposition finishJoinedPending(
AtomicReference<FutureState> state,
AtomicReference<ActiveContinuation> activeContinuation,
ActiveContinuation activePoll
) {
if (claimDeferredCancellation(state)) {
releaseContinuation(activeContinuation, activePoll);
return PendingJoinDisposition.CANCELLED;
}
PendingJoinDisposition pendingJoinDisposition = rearmPolling(state)
? PendingJoinDisposition.CONTINUE
: claimDeferredCancellation(state)
? PendingJoinDisposition.CANCELLED
: PendingJoinDisposition.STOPPED;
releaseContinuation(activeContinuation, activePoll);
return pendingJoinDisposition;
}
static <T> CompletableFuture<T> callAsync(
FutureCreator createFuture,
FuturePoll poll,
FutureComplete<T> complete,
FutureLifecycle free,
FutureLifecycle cancel
) {
return callAsyncInternal(createFuture, poll, complete, free, cancel);
}
static CompletableFuture<Void> callAsyncVoid(
FutureCreator createFuture,
FuturePoll poll,
FutureLifecycle completeVoid,
FutureLifecycle free,
FutureLifecycle cancel
) {
return callAsyncInternal(
createFuture,
poll,
rustFuture -> {
completeVoid.apply(rustFuture);
return null;
},
free,
cancel
);
}
private static <T> CompletableFuture<T> callAsyncInternal(
FutureCreator createFuture,
FuturePoll poll,
FutureComplete<T> complete,
FutureLifecycle free,
FutureLifecycle cancel
) {
long rustFuture;
try {
rustFuture = createFuture.create();
} catch (Exception e) {
CompletableFuture<T> failed = new CompletableFuture<>();
failed.completeExceptionally(e);
return failed;
}
AtomicReference<FutureState> state = new AtomicReference<>(FutureState.ACTIVE);
AtomicReference<ActiveContinuation> activeContinuation = new AtomicReference<>(null);
ManagedFuture<T> result = new ManagedFuture<>(rustFuture, state, activeContinuation, cancel, free);
pollLoop(rustFuture, poll, complete, free, cancel, result, state, activeContinuation);
return result;
}
private static <T> void pollLoop(
long rustFuture,
FuturePoll poll,
FutureComplete<T> complete,
FutureLifecycle free,
FutureLifecycle cancel,
ManagedFuture<T> result,
AtomicReference<FutureState> state,
AtomicReference<ActiveContinuation> activeContinuation
) {
while (true) {
if (!state.compareAndSet(FutureState.ACTIVE, FutureState.POLLING)) return;
CompletableFuture<Byte> signal = new CompletableFuture<>();
BoltFFIContinuationSignal continuationSignal = new BoltFFIContinuationSignal(signal, () -> markReady(state));
long contHandle = BoltFFIContinuationMap.insert(continuationSignal);
ActiveContinuation activePoll = new ActiveContinuation(contHandle, continuationSignal);
activeContinuation.set(activePoll);
poll.poll(rustFuture, contHandle);
if (!signal.isDone()) {
if (enterWaiting(state)) {
signal.whenComplete((pollResult, error) -> {
releaseContinuation(activeContinuation, activePoll);
if (error != null) {
if (state.getAndSet(FutureState.OWNED) != FutureState.OWNED) {
cancelAndFree(rustFuture, cancel, free);
}
result.completeExceptionally(error);
return;
}
if (pollResult == POLL_READY) {
if (claimReady(state)) {
completeReadyResult(rustFuture, complete, free, result);
return;
}
finishDeferredCancellation(state, result);
return;
}
if (finishDeferredCancellation(state, result)) return;
if (rearmPolling(state)) {
pollLoop(rustFuture, poll, complete, free, cancel, result, state, activeContinuation);
}
});
return;
}
releaseContinuation(activeContinuation, activePoll);
if (finishDeferredCancellation(state, result)) return;
if (claimReady(state)) {
completeReadyResult(rustFuture, complete, free, result);
}
return;
}
byte pollResult;
try {
pollResult = signal.join();
} catch (Exception e) {
finishPollingFailure(
rustFuture,
free,
cancel,
result,
state,
activeContinuation,
activePoll,
e
);
return;
}
if (pollResult == POLL_READY) {
finishJoinedReady(
rustFuture,
complete,
free,
result,
state,
activeContinuation,
activePoll
);
return;
}
if (finishDeferredCancellation(state, result, activeContinuation, activePoll)) return;
PendingJoinDisposition pendingJoinDisposition =
finishJoinedPending(state, activeContinuation, activePoll);
if (pendingJoinDisposition == PendingJoinDisposition.CANCELLED) {
result.finishCancelled();
return;
}
if (pendingJoinDisposition == PendingJoinDisposition.CONTINUE) {
continue;
}
return;
}
}
private static <T> void completeReadyResult(
long rustFuture,
FutureComplete<T> complete,
FutureLifecycle free,
ManagedFuture<T> result
) {
try {
T value = complete.complete(rustFuture);
result.complete(value);
} catch (Exception e) {
result.completeExceptionally(e);
} finally {
try { free.apply(rustFuture); } catch (Exception ignored) {}
}
}
}
{%- endif %}
{%- endif %}
{%- if module.has_streams() %}
final class BoltFFIStreamContext {
private static final byte POLL_CLOSED = 1;
private final long subscription;
private final long batchSize;
private final java.util.function.BiFunction<Long, Long, byte[]> popBatch;
private final java.util.function.BiConsumer<Long, Long> poll;
private final java.util.function.LongConsumer unsubscribe;
private final java.util.function.LongConsumer freeFn;
private final java.util.function.Consumer<byte[]> processItems;
private final Runnable finish;
private final java.util.concurrent.atomic.AtomicInteger lifecycleTag = new java.util.concurrent.atomic.AtomicInteger(0);
private final java.util.concurrent.atomic.AtomicInteger callbackTag = new java.util.concurrent.atomic.AtomicInteger(0);
BoltFFIStreamContext(
long subscription, long batchSize,
java.util.function.BiFunction<Long, Long, byte[]> popBatch,
java.util.function.BiConsumer<Long, Long> poll,
java.util.function.LongConsumer unsubscribe,
java.util.function.LongConsumer freeFn,
java.util.function.Consumer<byte[]> processItems,
Runnable finish) {
this.subscription = subscription;
this.batchSize = batchSize;
this.popBatch = popBatch;
this.poll = poll;
this.unsubscribe = unsubscribe;
this.freeFn = freeFn;
this.processItems = processItems;
this.finish = finish;
}
void start() {
registerPoll();
}
void requestTermination() {
boolean started = lifecycleTag.compareAndSet(0, 1);
if (started) {
unsubscribe.accept(subscription);
lifecycleTag.compareAndSet(1, 2);
}
attemptFinalize();
}
private void attemptFinalize() {
if (!callbackTag.compareAndSet(0, 0)) return;
if (!lifecycleTag.compareAndSet(2, 3)) return;
freeFn.accept(subscription);
finish.run();
}
private void registerPoll() {
if (!lifecycleTag.compareAndSet(0, 0)) {
attemptFinalize();
return;
}
CompletableFuture<Byte> future = new CompletableFuture<>();
future.thenAccept(this::handlePoll);
long contHandle = BoltFFIContinuationMap.insert(future);
poll.accept(subscription, contHandle);
}
private void handlePoll(byte pollResult) {
boolean isClosed = pollResult == POLL_CLOSED;
if (!callbackTag.compareAndSet(0, 1)) {
attemptFinalize();
return;
}
try {
if (!lifecycleTag.compareAndSet(0, 0)) return;
while (true) {
byte[] bytes = popBatch.apply(subscription, batchSize);
if (bytes == null) throw new RuntimeException("BoltFFI: stream pop_batch failed (null)");
if (bytes.length == 0) break;
processItems.accept(bytes);
}
} finally {
callbackTag.compareAndSet(1, 0);
attemptFinalize();
}
if (isClosed) {
requestTermination();
return;
}
if (!lifecycleTag.compareAndSet(0, 0)) return;
registerPoll();
}
}
/**
* Represents a stream subscription returned by a generated Java binding.
*
* <p>All subscriptions can be cancelled or closed. Some subscriptions deliver items through a
* callback and only support lifecycle operations. Batch subscriptions keep items buffered on the
* native side and additionally support waiting, explicit batch reads, and
* {@code Flow.Publisher} adaptation.
*/
final class StreamSubscription<T> implements AutoCloseable {
private enum Mode {
BATCH,
CALLBACK
}
private final java.util.concurrent.atomic.AtomicBoolean closed = new java.util.concurrent.atomic.AtomicBoolean(false);
private final java.util.concurrent.atomic.AtomicBoolean publisherAttached = new java.util.concurrent.atomic.AtomicBoolean(false);
private final Mode mode;
private final long handle;
private final Runnable cancelAction;
private final java.util.function.BiFunction<Long, Long, byte[]> popBatchFn;
private final java.util.function.BiFunction<Long, Integer, Integer> waitFn;
private final java.util.function.Function<byte[], java.util.List<T>> decodeItems;
private StreamSubscription(
Mode mode,
long handle,
Runnable cancelAction,
java.util.function.BiFunction<Long, Long, byte[]> popBatchFn,
java.util.function.BiFunction<Long, Integer, Integer> waitFn,
java.util.function.Function<byte[], java.util.List<T>> decodeItems) {
this.mode = mode;
this.handle = handle;
this.cancelAction = cancelAction;
this.popBatchFn = popBatchFn;
this.waitFn = waitFn;
this.decodeItems = decodeItems;
}
static <T> StreamSubscription<T> callback(Runnable cancelAction) {
return new StreamSubscription<>(Mode.CALLBACK, 0L, cancelAction, null, null, null);
}
static <T> StreamSubscription<T> batch(
long handle,
java.util.function.BiFunction<Long, Long, byte[]> popBatchFn,
java.util.function.BiFunction<Long, Integer, Integer> waitFn,
java.util.function.LongConsumer unsubscribeFn,
java.util.function.LongConsumer freeFn,
java.util.function.Function<byte[], java.util.List<T>> decodeItems) {
return new StreamSubscription<>(
Mode.BATCH,
handle,
() -> {
if (handle == 0L) return;
unsubscribeFn.accept(handle);
freeFn.accept(handle);
},
popBatchFn,
waitFn,
decodeItems
);
}
/**
* Reads and removes up to {@code maxCount} items that are currently buffered for this
* subscription.
*
* <p>If no items are buffered, this returns an empty list immediately. This method is only
* valid for batch subscriptions; callback-driven subscriptions throw
* {@link IllegalStateException} because their items are drained directly into the callback
* path instead of being exposed as pull-based batches.
*/
public java.util.List<T> popBatch(long maxCount) {
requireBatchMode("popBatch");
if (handle == 0L) return java.util.Collections.emptyList();
byte[] _bytes = popBatchFn.apply(handle, maxCount);
if (_bytes == null) throw new RuntimeException("BoltFFI: stream pop_batch failed (null)");
if (_bytes.length == 0) return java.util.Collections.emptyList();
return decodeItems.apply(_bytes);
}
/**
* Blocks for up to {@code timeout} milliseconds waiting for new items to become available.
*
* <p>The returned value is the native wait result: a positive value means items are ready,
* {@code 0} means the wait timed out, and a negative value means the subscription was closed
* or unsubscribed. This method is only valid for batch subscriptions; callback-driven
* subscriptions throw {@link IllegalStateException}.
*/
public int waitForItems(int timeout) {
requireBatchMode("waitForItems");
if (handle == 0L) return -1;
return waitFn.apply(handle, timeout);
}
/**
* Stops further delivery and releases the underlying native subscription.
*
* <p>After this call, no more items will be delivered. For batch subscriptions this also
* wakes any pending waits and releases the native handle. For callback-driven subscriptions
* it stops future callback delivery. Safe to call multiple times.
*/
public void unsubscribe() {
close();
}
/**
* Cancels this subscription.
*
* <p>This is an alias for {@link #close()} so callers can use either cancellation-style or
* resource-style lifecycle management.
*/
public void cancel() {
close();
}
/**
* Closes this subscription.
*
* <p>Batch subscriptions unsubscribe from the underlying native stream and free the native
* handle. Callback-driven subscriptions stop future callback delivery. Safe to call multiple
* times; only the first call has any effect. Intended for use with try-with-resources and any
* other code path that needs deterministic shutdown.
*/
@Override
public void close() {
if (!closed.compareAndSet(false, true)) return;
cancelAction.run();
}
{%- if module.java_version.supports_flow_api() %}
/**
* Exposes a batch subscription as a live {@link java.util.concurrent.Flow.Publisher}.
*
* <p>The publisher waits for new native items, drains them in batches, and forwards them to
* the subscriber according to requested demand until the subscription is closed. Only one
* publisher can be attached to a given subscription. This method is only valid for batch
* subscriptions; callback-driven subscriptions throw {@link IllegalStateException}.
*/
public java.util.concurrent.Flow.Publisher<T> toPublisher() {
requireBatchMode("toPublisher");
return subscriber -> {
if (handle == 0L) {
subscriber.onSubscribe(new java.util.concurrent.Flow.Subscription() {
public void request(long n) {}
public void cancel() {}
});
subscriber.onComplete();
return;
}
if (!publisherAttached.compareAndSet(false, true)) {
subscriber.onSubscribe(new java.util.concurrent.Flow.Subscription() {
public void request(long n) {}
public void cancel() {}
});
subscriber.onError(new IllegalStateException("Stream publisher already attached"));
return;
}
final class PublisherSubscription implements java.util.concurrent.Flow.Subscription, Runnable {
private static final int WAIT_TIMEOUT_MILLIS = 100;
private final java.util.concurrent.atomic.AtomicBoolean done = new java.util.concurrent.atomic.AtomicBoolean(false);
private final java.util.concurrent.atomic.AtomicLong requested = new java.util.concurrent.atomic.AtomicLong(0L);
private final Object demandMonitor = new Object();
private final Thread worker = new Thread(this, "boltffi-stream-publisher");
void start() {
worker.setDaemon(true);
worker.start();
}
@Override
public void request(long n) {
if (done.get()) return;
if (n <= 0L) {
fail(new IllegalArgumentException("Flow subscription request must be positive"));
return;
}
addRequested(n);
synchronized (demandMonitor) {
demandMonitor.notifyAll();
}
}
@Override
public void cancel() {
if (!done.compareAndSet(false, true)) return;
StreamSubscription.this.close();
synchronized (demandMonitor) {
demandMonitor.notifyAll();
}
}
@Override
public void run() {
try {
while (!done.get()) {
waitForDemand();
if (done.get()) return;
long batchSize = Math.max(1L, Math.min(requested.get(), 256L));
byte[] bytes = popBatchFn.apply(handle, batchSize);
if (bytes == null) throw new RuntimeException("BoltFFI: stream pop_batch failed (null)");
if (bytes.length == 0) {
int waitResult = waitFn.apply(handle, WAIT_TIMEOUT_MILLIS);
if (waitResult < 0) {
complete();
return;
}
continue;
}
java.util.List<T> items = decodeItems.apply(bytes);
for (T item : items) {
if (done.get()) return;
subscriber.onNext(item);
consumeDemand();
}
}
} catch (Throwable throwable) {
fail(throwable);
}
}
private void waitForDemand() throws InterruptedException {
synchronized (demandMonitor) {
while (!done.get() && requested.get() == 0L) {
demandMonitor.wait();
}
}
}
private void addRequested(long value) {
requested.updateAndGet(current -> {
if (current == Long.MAX_VALUE) return Long.MAX_VALUE;
long next = current + value;
return next < 0L ? Long.MAX_VALUE : next;
});
}
private void consumeDemand() {
requested.updateAndGet(current -> current == Long.MAX_VALUE ? Long.MAX_VALUE : Math.max(0L, current - 1L));
}
private void complete() {
if (!done.compareAndSet(false, true)) return;
try {
subscriber.onComplete();
} finally {
StreamSubscription.this.close();
}
}
private void fail(Throwable throwable) {
if (!done.compareAndSet(false, true)) return;
try {
subscriber.onError(throwable);
} finally {
StreamSubscription.this.close();
}
}
}
PublisherSubscription publisherSubscription = new PublisherSubscription();
subscriber.onSubscribe(publisherSubscription);
publisherSubscription.start();
};
}
{%- endif %}
private void requireBatchMode(String operation) {
if (mode == Mode.BATCH) return;
throw new IllegalStateException(operation + " is only available for batch stream subscriptions");
}
}
{%- endif %}