public java.util.stream.Stream<{{ item_type }}> {{ method_name }}(final {{ request_type }} {{ request_param }}) throws {{ exception_class }} {
java.util.Objects.requireNonNull({{ request_param }}, "{{ request_param }} must not be null");
final MemorySegment streamHandle;
try (var arena = Arena.ofShared()) {
String requestJson = STREAM_MAPPER.writeValueAsString({{ request_param }});
var cRequestJson = arena.allocateFrom(requestJson);
MemorySegment requestPtr = (MemorySegment) NativeLib.{{ req_from_json }}.invoke(cRequestJson);
if (requestPtr.equals(MemorySegment.NULL)) {
checkLastFfiError();
throw new {{ exception_class }}("{{ method_name }}: failed to marshal request", (Throwable) null);
}
try {
streamHandle = (MemorySegment) NativeLib.{{ start_handle }}.invoke(this.handle, requestPtr);
} finally {
NativeLib.{{ req_free }}.invoke(requestPtr);
}
} catch (Throwable e) {
if (e instanceof {{ exception_class }} ex) { throw ex; }
throw new {{ exception_class }}("{{ method_name }}: failed to start stream", e);
}
if (streamHandle == null || streamHandle.equals(MemorySegment.NULL)) {
checkLastFfiError();
throw new {{ exception_class }}("{{ method_name }}: stream handle was null", (Throwable) null);
}
final MemorySegment finalStreamHandle = streamHandle;
java.util.Iterator<{{ item_type }}> underlyingIterator = new java.util.Iterator<{{ item_type }}>() {
private {{ item_type }} pending = pull();
private boolean closed = false;
private {{ item_type }} pull() {
if (closed) { return null; }
MemorySegment chunkPtr;
try {
chunkPtr = (MemorySegment) NativeLib.{{ next_handle }}.invoke(finalStreamHandle);
} catch (Throwable e) {
closeStream();
throw new RuntimeException(new {{ exception_class }}("{{ method_name }}: stream advance failed", e));
}
if (chunkPtr.equals(MemorySegment.NULL)) {
closeStream();
int code;
try {
code = (int) NativeLib.{{ prefix_upper }}_LAST_ERROR_CODE.invoke();
} catch (Throwable e) {
throw new RuntimeException(new {{ exception_class }}("{{ method_name }}: failed to read last_error_code", e));
}
if (code != 0) {
String msg;
try {
MemorySegment ctxPtr = (MemorySegment) NativeLib.{{ prefix_upper }}_LAST_ERROR_CONTEXT.invoke();
msg = ctxPtr.equals(MemorySegment.NULL) ? "unknown" : ctxPtr.reinterpret(Long.MAX_VALUE).getString(0);
} catch (Throwable e) {
throw new RuntimeException(new {{ exception_class }}(code, "{{ method_name }}: failed to read error context"));
}
throw new RuntimeException(new {{ exception_class }}(code, msg));
}
return null;
}
try {
MemorySegment jsonPtr = (MemorySegment) NativeLib.{{ item_to_json }}.invoke(chunkPtr);
if (jsonPtr.equals(MemorySegment.NULL)) {
NativeLib.{{ item_free }}.invoke(chunkPtr);
throw new RuntimeException(new {{ exception_class }}("{{ method_name }}: failed to serialize chunk", (Throwable) null));
}
String json = jsonPtr.reinterpret(Long.MAX_VALUE).getString(0);
NativeLib.{{ prefix_upper }}_FREE_STRING.invoke(jsonPtr);
NativeLib.{{ item_free }}.invoke(chunkPtr);
return STREAM_MAPPER.readValue(json, {{ item_type }}.class);
} catch (Throwable e) {
closeStream();
throw new RuntimeException(new {{ exception_class }}("{{ method_name }}: failed to deserialize chunk", e));
}
}
private void closeStream() {
if (closed) { return; }
closed = true;
try {
NativeLib.{{ free_handle }}.invoke(finalStreamHandle);
} catch (Throwable ignore) {
// best-effort cleanup
}
}
@Override
public boolean hasNext() {
return pending != null;
}
@Override
public {{ item_type }} next() {
if (pending == null) {
throw new java.util.NoSuchElementException();
}
{{ item_type }} current = pending;
pending = pull();
return current;
}
};
return java.util.stream.StreamSupport.stream(
java.util.Spliterators.spliteratorUnknownSize(
underlyingIterator,
java.util.Spliterator.ORDERED | java.util.Spliterator.NONNULL
),
false
);
}