public enum EventCode {
None = 0,
Subtask = 1,
StreamRead = 2,
StreamWrite = 3,
FutureRead = 4,
FutureWrite = 5,
Cancel = 6,
}
public enum CallbackCode : uint
{
Exit = 0,
Yield = 1,
Wait = 2,
}
[StructLayout(LayoutKind.Sequential)]
public struct ContextTask
{
public int WaitableSetHandle;
public int FutureHandle;
}
public static class AsyncSupport
{
private static ConcurrentDictionary<int, ConcurrentDictionary<int, WaitableInfoState>> pendingTasks = new ();
internal static class PollWasmInterop
{
[DllImport("wasi:io/poll@0.2.0", EntryPoint = "poll"), WasmImportLinkage]
internal static extern void wasmImportPoll(nint p0, int p1, nint p2);
}
private static class Interop
{
[global::System.Runtime.InteropServices.DllImport("$root", EntryPoint = "[waitable-set-new]"), global::System.Runtime.InteropServices.WasmImportLinkageAttribute]
internal static extern int WaitableSetNew();
[global::System.Runtime.InteropServices.DllImport("$root", EntryPoint = "[waitable-join]"), global::System.Runtime.InteropServices.WasmImportLinkageAttribute]
internal static extern void WaitableJoin(int waitable, int set);
[global::System.Runtime.InteropServices.DllImport("$root", EntryPoint = "[waitable-set-wait]"), global::System.Runtime.InteropServices.WasmImportLinkageAttribute]
internal static unsafe extern int WaitableSetWait(int waitable, uint* waitableHandlePtr);
[global::System.Runtime.InteropServices.DllImport("$root", EntryPoint = "[waitable-set-poll]"), global::System.Runtime.InteropServices.WasmImportLinkageAttribute]
internal static unsafe extern uint WaitableSetPoll(int waitable, uint* waitableHandlePtr);
[global::System.Runtime.InteropServices.DllImport("$root", EntryPoint = "[waitable-set-drop]"), global::System.Runtime.InteropServices.WasmImportLinkageAttribute]
internal static extern void WaitableSetDrop(int waitable);
[global::System.Runtime.InteropServices.DllImport("$root", EntryPoint = "[context-set-0]"), global::System.Runtime.InteropServices.WasmImportLinkageAttribute]
internal static unsafe extern void ContextSet(ContextTask* waitable);
[global::System.Runtime.InteropServices.DllImport("$root", EntryPoint = "[context-get-0]"), global::System.Runtime.InteropServices.WasmImportLinkageAttribute]
internal static unsafe extern ContextTask* ContextGet();
}
public static int WaitableSetNew()
{
var waitableSet = Interop.WaitableSetNew();
Console.WriteLine($"WaitableSet created with number {waitableSet}");
return waitableSet;
}
public static unsafe void WaitableSetPoll(int waitableHandle)
{
var error = Interop.WaitableSetPoll(waitableHandle, null);
if(error != 0)
{
throw new Exception($"WaitableSetPoll failed with error code: {error}");
}
}
internal static void Join(SubtaskStatus subtask, int waitableSetHandle, WaitableInfoState waitableInfoState)
{
AddTaskToWaitables(waitableSetHandle, subtask.Handle, waitableInfoState);
Interop.WaitableJoin(subtask.Handle, waitableSetHandle);
}
internal static void Join(int readerWriterHandle, int waitableHandle, WaitableInfoState waitableInfoState)
{
AddTaskToWaitables(waitableHandle, readerWriterHandle, waitableInfoState);
Interop.WaitableJoin(readerWriterHandle, waitableHandle);
}
public static void Join(int handle)
{
Interop.WaitableJoin(handle, 0);
}
private static void AddTaskToWaitables(int waitableSetHandle, int waitableHandle, WaitableInfoState waitableInfoState)
{
Console.WriteLine($"Adding waitable {waitableHandle} to set {waitableSetHandle}");
var waitableSetOfTasks = pendingTasks.GetOrAdd(waitableSetHandle, _ => new ConcurrentDictionary<int, WaitableInfoState>());
waitableSetOfTasks[waitableHandle] = waitableInfoState;
}
public static unsafe EventWaitable WaitableSetWait(int waitableSetHandle)
{
uint* buffer = stackalloc uint[2];
var eventCode = (EventCode)Interop.WaitableSetWait(waitableSetHandle, buffer);
return new EventWaitable(eventCode, buffer[0], buffer[1]);
}
public static void WaitableSetDrop(int handle)
{
Interop.WaitableSetDrop(handle);
}
public static unsafe void ContextSet(ContextTask* contextTask)
{
Interop.ContextSet(contextTask);
}
public static unsafe ContextTask* ContextGet()
{
return Interop.ContextGet();
}
public static unsafe uint Callback(EventWaitable e, ContextTask* contextPtr, Action taskReturn)
{
Console.WriteLine($"Callback Event code {e.EventCode} Code {e.Code} Waitable {e.Waitable} Waitable Status {e.WaitableStatus.State}, Count {e.WaitableCount}");
ContextTask* contextTaskPtr = ContextGet();
var waitables = pendingTasks[contextTaskPtr->WaitableSetHandle];
var waitableInfoState = waitables[e.Waitable];
if (e.IsDropped)
{
Console.WriteLine("Dropped.");
waitableInfoState.FutureStream.OtherSideDropped();
}
if (e.IsCompleted || e.IsDropped)
{
Console.WriteLine("Setting the result");
waitables.Remove(e.Waitable, out _);
if (e.IsSubtask)
{
waitableInfoState.SetResult(0 );
}
else
{
waitableInfoState.FutureStream.FreeBuffer();
if (e.IsDropped)
{
waitableInfoState.SetException(new StreamDroppedException());
}
else
{
waitableInfoState.SetResult(e.WaitableCount);
}
}
if (waitables.Count == 0)
{
Console.WriteLine($"No more waitables for waitable {e.Waitable} in set {contextTaskPtr->WaitableSetHandle}");
taskReturn();
ContextSet(null);
Marshal.FreeHGlobal((IntPtr)contextTaskPtr);
return (uint)CallbackCode.Exit;
}
Console.WriteLine("More waitables in the set.");
return (uint)CallbackCode.Wait | (uint)(contextTaskPtr->WaitableSetHandle << 4);
}
throw new NotImplementedException($"WaitableStatus not implemented {e.WaitableStatus.State} in set {contextTaskPtr->WaitableSetHandle}");
}
internal static unsafe Task TaskFromStatus(uint status)
{
var subtaskStatus = new SubtaskStatus(status);
status = status & 0xF;
if (subtaskStatus.IsSubtaskStarting || subtaskStatus.IsSubtaskStarted)
{
ContextTask* contextTaskPtr = ContextGet();
if (contextTaskPtr == null) {
contextTaskPtr = AllocateAndSetNewContext();
Console.WriteLine($"TaskFromStatus creating WaitableSet {contextTaskPtr->WaitableSetHandle}");
}
TaskCompletionSource tcs = new TaskCompletionSource();
Join(subtaskStatus, contextTaskPtr->WaitableSetHandle, new WaitableInfoState(tcs));
return tcs.Task;
}
else if (subtaskStatus.IsSubtaskReturned)
{
return Task.CompletedTask;
}
else
{
throw new Exception($"unexpected subtask status: {status}");
}
}
public static unsafe Task<T> TaskFromStatus<T>(uint status, Func<T> liftFunc)
{
var subtaskStatus = new SubtaskStatus(status);
status = status & 0xF;
var tcs = new TaskCompletionSource<T>();
if (subtaskStatus.IsSubtaskStarting || subtaskStatus.IsSubtaskStarted)
{
ContextTask* contextTaskPtr = ContextGet();
if (contextTaskPtr == null) {
contextTaskPtr = (ContextTask *)Marshal.AllocHGlobal(Marshal.SizeOf<ContextTask>());
Console.WriteLine("TaskFromStatus<T> creating WaitableSet");
contextTaskPtr->WaitableSetHandle = WaitableSetNew();
ContextSet(contextTaskPtr);
}
return tcs.Task;
}
else if (subtaskStatus.IsSubtaskReturned)
{
tcs.SetResult(liftFunc());
return tcs.Task;
}
else
{
throw new Exception($"unexpected subtask status: {status}");
}
}
internal static unsafe ContextTask* AllocateAndSetNewContext()
{
var contextTaskPtr = (ContextTask *)Marshal.AllocHGlobal(Marshal.SizeOf<ContextTask>());
contextTaskPtr->WaitableSetHandle = AsyncSupport.WaitableSetNew();
AsyncSupport.ContextSet(contextTaskPtr);
return contextTaskPtr;
}
}
public delegate ulong New();
public delegate uint FutureRead(int handle, IntPtr buffer);
public delegate void DropReader(int handle);
public delegate void DropWriter(int handle);
public delegate uint FutureWrite(int handle, IntPtr buffer);
public delegate uint StreamWrite(int handle, IntPtr buffer, uint length);
public delegate uint StreamRead(int handle, IntPtr buffer, uint length);
public delegate void Lower(object payload, uint size);
public struct FutureVTable
{
public New New;
public FutureRead Read;
public FutureWrite Write;
public DropReader DropReader;
public DropWriter DropWriter;
}
public struct StreamVTable
{
public New New;
public StreamRead Read;
public StreamWrite Write;
public DropReader DropReader;
public DropWriter DropWriter;
public Lower? Lower;
}
internal interface IFutureStream : IDisposable
{
void FreeBuffer();
void OtherSideDropped();
}
public static class FutureHelpers
{
internal static (FutureReader, FutureWriter) RawFutureNew(FutureVTable vtable)
{
var packed = vtable.New();
var readerHandle = (int)(packed & 0xFFFFFFFF);
var writerHandle = (int)(packed >> 32);
return (new FutureReader(readerHandle, vtable), new FutureWriter(writerHandle, vtable));
}
internal static (FutureReader<T>, FutureWriter<T>) RawFutureNew<T>(FutureVTable vtable)
{
var packed = vtable.New();
var readerHandle = (int)(packed & 0xFFFFFFFF);
var writerHandle = (int)(packed >> 32);
return (new FutureReader<T>(readerHandle, vtable), new FutureWriter<T>(writerHandle, vtable));
}
internal static (StreamReader, StreamWriter) RawStreamNew(StreamVTable vtable)
{
var packed = vtable.New();
var readerHandle = (int)(packed & 0xFFFFFFFF);
var writerHandle = (int)(packed >> 32);
return (new StreamReader(readerHandle, vtable), new StreamWriter(writerHandle, vtable));
}
internal static (StreamReader<T>, StreamWriter<T>) RawStreamNew<T>(StreamVTable vtable)
{
var packed = vtable.New();
var readerHandle = (int)(packed & 0xFFFFFFFF);
var writerHandle = (int)(packed >> 32);
Console.WriteLine($"Creating reader<T> with handle {readerHandle}");
Console.WriteLine($"Creating writer<T> with handle {writerHandle}");
return (new StreamReader<T>(readerHandle, vtable), new StreamWriter<T>(writerHandle, vtable));
}
}
public abstract class ReaderBase : IFutureStream
{
private GCHandle? bufferHandle;
private bool writerDropped;
internal ReaderBase(int handle)
{
Handle = handle;
}
internal int Handle { get; private set; }
internal int TakeHandle()
{
if (Handle == 0)
{
throw new InvalidOperationException("Handle already taken");
}
var handle = Handle;
Handle = 0;
return handle;
}
internal abstract uint VTableRead(IntPtr bufferPtr, int length);
internal unsafe Task<int> ReadInternal(Func<GCHandle?> liftBuffer, int length)
{
if (Handle == 0)
{
throw new InvalidOperationException("Handle already taken");
}
if (writerDropped)
{
throw new StreamDroppedException();
}
bufferHandle = liftBuffer();
var status = new WaitableStatus(VTableRead(bufferHandle == null ? IntPtr.Zero : bufferHandle.Value.AddrOfPinnedObject(), length));
if (status.IsBlocked)
{
Console.WriteLine("Read Blocked");
var tcs = new TaskCompletionSource<int>();
ContextTask* contextTaskPtr = AsyncSupport.ContextGet();
if(contextTaskPtr == null)
{
Console.WriteLine("FutureReader Read Blocked creating WaitableSet");
contextTaskPtr = AsyncSupport.AllocateAndSetNewContext();
}
Console.WriteLine("blocked read before join");
AsyncSupport.Join(Handle, contextTaskPtr->WaitableSetHandle, new WaitableInfoState(tcs, this));
Console.WriteLine("blocked read after join");
return tcs.Task;
}
if (status.IsCompleted)
{
return Task.FromResult((int)status.Count);
}
throw new NotImplementedException(status.State.ToString());
}
void IFutureStream.FreeBuffer()
{
bufferHandle?.Free();
}
void IFutureStream.OtherSideDropped()
{
writerDropped = true;
}
internal abstract void VTableDrop();
void Dispose(bool _disposing)
{
if (Handle != 0)
{
VTableDrop();
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~ReaderBase()
{
Dispose(false);
}
}
public class FutureReader : ReaderBase
{
internal FutureReader(int handle, FutureVTable vTable) : base(handle)
{
VTable = vTable;
}
internal FutureVTable VTable { get; private set; }
public Task Read()
{
return ReadInternal(() => null, 0);
}
internal override uint VTableRead(IntPtr ptr, int length)
{
return VTable.Read(Handle, ptr);
}
internal override void VTableDrop()
{
VTable.DropReader(Handle);
}
}
public class FutureReader<T>(int handle, FutureVTable vTable) : ReaderBase(handle)
{
public FutureVTable VTable { get; private set; } = vTable;
private GCHandle LiftBuffer(T buffer)
{
if(typeof(T) == typeof(byte))
{
return GCHandle.Alloc(buffer, GCHandleType.Pinned);
}
else
{
throw new NotImplementedException("reading from futures types that require lifting");
}
}
public Task Read(T buffer)
{
return ReadInternal(() => LiftBuffer(buffer), 1);
}
internal override uint VTableRead(IntPtr ptr, int length)
{
return VTable.Read(Handle, ptr);
}
internal override void VTableDrop()
{
VTable.DropReader(Handle);
}
}
public class StreamReader : ReaderBase
{
public StreamReader(int handle, StreamVTable vTable) : base(handle)
{
VTable = vTable;
}
public StreamVTable VTable { get; private set; }
public Task Read(int length)
{
return ReadInternal(() => null, length);
}
internal override uint VTableRead(IntPtr ptr, int length)
{
return VTable.Read(Handle, ptr, (uint)length);
}
internal override void VTableDrop()
{
VTable.DropReader(Handle);
}
}
public class StreamReader<T>(int handle, StreamVTable vTable) : ReaderBase(handle)
{
public StreamVTable VTable { get; private set; } = vTable;
private GCHandle LiftBuffer(T[] buffer)
{
if(typeof(T) == typeof(byte))
{
return GCHandle.Alloc(buffer, GCHandleType.Pinned);
}
else
{
throw new NotImplementedException("reading from stream types that require lifting");
}
}
public Task<int> Read(T[] buffer)
{
return ReadInternal(() => LiftBuffer(buffer), buffer.Length);
}
internal override uint VTableRead(IntPtr ptr, int length)
{
return VTable.Read(Handle, ptr, (uint)length);
}
internal override void VTableDrop()
{
VTable.DropReader(Handle);
}
}
public abstract class WriterBase : IFutureStream
{
private GCHandle? bufferHandle;
private bool readerDropped;
internal WriterBase(int handle)
{
Handle = handle;
}
internal int Handle { get; private set; }
internal int TakeHandle()
{
if (Handle == 0)
{
throw new InvalidOperationException("Handle already taken");
}
var handle = Handle;
Handle = 0;
return handle;
}
internal abstract uint VTableWrite(IntPtr bufferPtr, int length);
internal unsafe Task<int> WriteInternal(Func<GCHandle?> lowerPayload, int length)
{
if (Handle == 0)
{
throw new InvalidOperationException("Handle already taken");
}
if (readerDropped)
{
throw new StreamDroppedException();
}
bufferHandle = lowerPayload();
var status = new WaitableStatus(VTableWrite(bufferHandle == null ? IntPtr.Zero : bufferHandle.Value.AddrOfPinnedObject(), length));
if (status.IsBlocked)
{
Console.WriteLine("blocked write");
var tcs = new TaskCompletionSource<int>();
ContextTask* contextTaskPtr = AsyncSupport.ContextGet();
if(contextTaskPtr == null)
{
contextTaskPtr = AsyncSupport.AllocateAndSetNewContext();
}
Console.WriteLine("blocked write before join");
AsyncSupport.Join(Handle, contextTaskPtr->WaitableSetHandle, new WaitableInfoState(tcs, this));
Console.WriteLine("blocked write after join");
return tcs.Task;
}
if (status.IsCompleted)
{
bufferHandle?.Free();
return Task.FromResult((int)status.Count);
}
throw new NotImplementedException($"Unsupported write status {status.State}");
}
void IFutureStream.FreeBuffer()
{
bufferHandle?.Free();
}
void IFutureStream.OtherSideDropped()
{
readerDropped = true;
}
internal abstract void VTableDrop();
void Dispose(bool _disposing)
{
if (Handle != 0)
{
VTableDrop();
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~WriterBase()
{
Dispose(false);
}
}
public class FutureWriter(int handle, FutureVTable vTable) : WriterBase(handle)
{
public FutureVTable VTable { get; private set; } = vTable;
public Task<int> Write()
{
return WriteInternal(() => null, 0);
}
internal override uint VTableWrite(IntPtr bufferPtr, int length)
{
return VTable.Write(Handle, bufferPtr);
}
internal override void VTableDrop()
{
VTable.DropWriter(Handle);
}
}
public class FutureWriter<T>(int handle, FutureVTable vTable) : WriterBase(handle)
{
public FutureVTable VTable { get; private set; } = vTable;
public Task Write()
{
return WriteInternal(() => null, 1);
}
internal override uint VTableWrite(IntPtr bufferPtr, int length)
{
return VTable.Write(Handle, bufferPtr);
}
internal override void VTableDrop()
{
VTable.DropWriter(Handle);
}
}
public class StreamWriter(int handle, StreamVTable vTable) : WriterBase(handle)
{
public StreamVTable VTable { get; private set; } = vTable;
public Task Write()
{
return WriteInternal(() => null, 0);
}
internal override uint VTableWrite(IntPtr bufferPtr, int length)
{
return VTable.Write(Handle, bufferPtr, (uint)length);
}
internal override void VTableDrop()
{
VTable.DropWriter(Handle);
}
}
public class StreamWriter<T>(int handle, StreamVTable vTable) : WriterBase(handle)
{
private GCHandle bufferHandle;
public StreamVTable VTable { get; private set; } = vTable;
private GCHandle LowerPayload(T[] payload)
{
if (VTable.Lower == null)
{
return GCHandle.Alloc(payload, GCHandleType.Pinned);
}
else
{
throw new NotSupportedException("StreamWriter Write where the payload must be lowered.");
}
}
public Task<int> Write(T[] payload)
{
return WriteInternal(() => LowerPayload(payload), payload.Length);
}
internal override uint VTableWrite(IntPtr bufferPtr, int length)
{
return VTable.Write(Handle, bufferPtr, (uint)length);
}
internal override void VTableDrop()
{
VTable.DropWriter(Handle);
}
}
internal struct WaitableInfoState
{
internal WaitableInfoState(TaskCompletionSource<int> taskCompletionSource, IFutureStream futureStream)
{
taskCompletionSourceInt = taskCompletionSource;
FutureStream = futureStream;
}
internal WaitableInfoState(TaskCompletionSource taskCompletionSource, IFutureStream futureStream)
{
this.taskCompletionSource = taskCompletionSource;
FutureStream = futureStream;
}
internal WaitableInfoState(TaskCompletionSource taskCompletionSource)
{
this.taskCompletionSource = taskCompletionSource;
}
internal void SetResult(int count)
{
if (taskCompletionSource != null)
{
Console.WriteLine("Setting result for void waitable completion source");
taskCompletionSource.SetResult();
}
else
{
taskCompletionSourceInt.SetResult(count);
}
}
internal void SetException(Exception e)
{
if (taskCompletionSource != null)
{
Console.WriteLine("Setting exception waitable completion source");
taskCompletionSource.SetException(e);
}
else
{
taskCompletionSourceInt.SetException(e);
}
}
private TaskCompletionSource taskCompletionSource;
private TaskCompletionSource<int> taskCompletionSourceInt;
internal IFutureStream FutureStream;
}
public class StreamDroppedException : Exception
{
public StreamDroppedException() : base()
{
}
public StreamDroppedException(string message) : base(message)
{
}
}