type {{ stream_item_alias }} = std::result::Result<{{ item_type }}, Box<dyn std::error::Error + Send + Sync + 'static>>;
type {{ stream_box_alias }} = BoxStream<'static, {{ stream_item_alias }}>;
struct {{ stream_handle_type }} {
rt: &'static Runtime,
stream: Mutex<Option<{{ stream_box_alias }}>>,
}
#[unsafe(no_mangle)]
pub unsafe extern "system" fn {{ start_sym }}(
mut env: EnvUnowned,
_class: JClass,
client_handle: jlong,
request_json: JString,
) -> jlong {
// SAFETY: env is a valid EnvUnowned passed by the JVM for this native call frame.
let mut __jni_attach_guard = unsafe { jni::AttachGuard::from_unowned(env.as_raw()) };
let env = __jni_attach_guard.borrow_env_mut();
// SAFETY: client_handle was produced by the matching constructor shim.
let client: &core_crate::{{ type_name }} = unsafe { &*(client_handle as *const core_crate::{{ type_name }}) };
let req_str = match jstring_to_string(env, request_json) {
Ok(s) => s,
Err(e) => { throw_jni_error(env, &format!("{e}")); return 0; }
};
{{ request_unmarshal }}{{ stream_call_block }} let stream = match stream_result {
Ok(s) => s,
Err(e) => { throw_jni_error(env, &format!("{e}")); return 0; }
};
// Map the concrete error type to Box<dyn Error> so the handle type is
// independent of the stream's error associated type.
let mapped = {
use futures_util::StreamExt;
Box::pin(stream.map(|r| r.map_err(|e| -> Box<dyn std::error::Error + Send + Sync + 'static> { Box::new(e) })))
};
let handle = Box::new({{ stream_handle_type }} {
rt: runtime(),
stream: Mutex::new(Some(mapped)),
});
Box::into_raw(handle) as jlong
}
#[unsafe(no_mangle)]
pub unsafe extern "system" fn {{ next_sym }}(
mut env: EnvUnowned,
_class: JClass,
stream_handle: jlong,
) -> jstring {
// SAFETY: env is a valid EnvUnowned passed by the JVM for this native call frame.
let mut __jni_attach_guard = unsafe { jni::AttachGuard::from_unowned(env.as_raw()) };
let env = __jni_attach_guard.borrow_env_mut();
if stream_handle == 0 { return std::ptr::null_mut(); }
// SAFETY: stream_handle was produced by the matching Start shim.
let h = unsafe { &*(stream_handle as *const {{ stream_handle_type }}) };
let mut guard = match h.stream.lock() {
Ok(g) => g,
Err(_) => return std::ptr::null_mut(),
};
let Some(stream) = guard.as_mut() else { return std::ptr::null_mut(); };
let Some(next) = run_or_throw(env, std::panic::AssertUnwindSafe(|| h.rt.block_on(stream.next()))) else {
return std::ptr::null_mut();
};
match next {
None => std::ptr::null_mut(),
Some(Err(e)) => {
throw_jni_error(env, &format!("{e}"));
std::ptr::null_mut()
}
Some(Ok(chunk)) => {
let s = match serde_json::to_string(&chunk) {
Ok(s) => s,
Err(e) => {
throw_jni_error(env, &format!("serialize: {e}")); return std::ptr::null_mut();
}
};
string_to_jstring(env, &s)
}
}
}
#[unsafe(no_mangle)]
pub unsafe extern "system" fn {{ free_sym }}(
_env: EnvUnowned,
_class: JClass,
stream_handle: jlong,
) {
if stream_handle == 0 { return; }
// SAFETY: stream_handle was produced by the matching Start shim.
unsafe { let _ = Box::from_raw(stream_handle as *mut {{ stream_handle_type }}); }
}