zuzu-rust 0.2.0

Rust implementation of ZuzuScript
Documentation
from test/more import *;
from std/task import
	Channel,
	CancellationSource,
	all,
	failed,
	race,
	resolved,
	sleep,
	timeout,
	yield;

let yield_log := "";

async function answer_after_yield () {
	await {
		yield();
	};
	return 42;
}

async function double_after_sleep ( n ) {
	await {
		sleep(0.01);
	};
	return n * 2;
}

async function fail_after_yield () {
	await {
		yield();
	};
	throw new Exception( message: "async-boom" );
}

async function yield_checkpoint () {
	yield_log := yield_log _ "before";
	await {
		yield();
	};
	yield_log := yield_log _ ",after";
	return yield_log;
}

async function test_basic_tasks () {
	let task := answer_after_yield();
	is( typeof task, "Task", "async function call returns Task" );
	is( task.done(), false, "new async task is not immediately done" );
	is( await { task; }, 42, "async task can be awaited" );
	ok( task.done(), "awaited task is done" );
	is( task.status(), "fulfilled", "awaited task is fulfilled" );

	let immediate := resolved("ready");
	ok( immediate.poll(), "resolved task polls as complete" );
	is( await { immediate; }, "ready", "resolved task awaits value" );

	let failure := false;
	try {
		await {
			failed("task-boom");
		};
	}
	catch ( Exception e ) {
		failure := e.to_String() eq "Exception: task-boom";
	}
	ok( failure, "failed task throws when awaited" );

	let thrown := false;
	try {
		await {
			fail_after_yield();
		};
	}
	catch ( Exception e ) {
		thrown := e.to_String() eq "Exception: async-boom";
	}
	ok( thrown, "async exception propagates through task" );
}

async function test_timers_yield_and_cancellation () {
	is(
		await {
			yield_checkpoint();
		},
		"before,after",
		"yield can be awaited and resumes the task",
	);

	let sleeping := sleep(1);
	is( sleeping.status(), "sleeping", "sleep task reports sleeping" );
	sleeping.cancel("stop");
	is( sleeping.status(), "cancelled", "cancel changes task status" );
	ok( sleeping.done(), "cancelled task is done" );

	let cancelled := false;
	try {
		await {
			sleeping;
		};
	}
	catch ( CancelledException e ) {
		cancelled := e.to_String() eq "CancelledException: stop";
	}
	ok( cancelled, "awaiting cancelled task throws reason" );

	let source := new CancellationSource();
	let token := source.token();
	let watched := sleep(1);
	token.watch(watched);
	source.cancel("token stop");
	ok( source.cancelled(), "cancellation source records cancellation" );
	ok( token.cancelled(), "cancellation token records cancellation" );

	let token_thrown := false;
	try {
		token.throw_if_cancelled();
	}
	catch ( CancelledException e ) {
		token_thrown := e.to_String() eq "CancelledException: token stop";
	}
	ok( token_thrown, "token throws stored cancellation reason" );

	let watched_cancelled := false;
	try {
		await {
			watched;
		};
	}
	catch ( CancelledException e ) {
		watched_cancelled :=
			e.to_String() eq "CancelledException: token stop";
	}
	ok( watched_cancelled, "token cancels watched tasks" );
}

async function test_combinators () {
	let values := await {
		all( [
			resolved("a"),
			double_after_sleep(21),
			answer_after_yield(),
		] );
	};
	is( values, [ "a", 42, 42 ], "all preserves ordered results" );

	let all_loser := sleep(1);
	let all_failed := all( [ failed("all-boom"), all_loser ] );
	let all_rejected := false;
	try {
		await {
			all_failed;
		};
	}
	catch ( Exception e ) {
		all_rejected := e.to_String() eq "Exception: all-boom";
	}
	ok( all_rejected, "all propagates child rejection" );
	is(
		all_loser.status(),
		"cancelled",
		"all cancels unfinished tasks after rejection",
	);

	let race_loser := sleep(1);
	is(
		await {
			race( [ resolved("winner"), race_loser ] );
		},
		"winner",
		"race returns first resolved task",
	);
	is( race_loser.status(), "cancelled", "race cancels losing task" );

	let timed_target := new Channel().recv();
	let timed_out := false;
	try {
		await {
			timeout( 0.01, timed_target );
		};
	}
	catch ( TimeoutException e ) {
		timed_out := true;
	}
	ok( timed_out, "timeout throws TimeoutException" );
	is( timed_target.status(), "cancelled", "timeout cancels wrapped task" );
}

async function test_channels () {
	let ch := new Channel();

	await {
		ch.send("first");
	};
	await {
		ch.send("second");
	};
	is( await { ch.recv(); }, "first", "channel receives first FIFO item" );
	is( await { ch.recv(); }, "second", "channel receives second FIFO item" );

	let pending := ch.recv();
	await {
		ch.send("later");
	};
	is( await { pending; }, "later", "pending recv completes after send" );

	ch.close();
	is( await { ch.recv(); }, null, "closed drained channel recv returns null" );

	let send_closed := false;
	try {
		await {
			ch.send("too late");
		};
	}
	catch ( ChannelClosedException e ) {
		send_closed := true;
	}
	ok( send_closed, "send after close throws ChannelClosedException" );
}

async function test_spawn () {
	let marker := "before";
	let spawned := spawn {
		await {
			sleep(0.01);
		};
		marker := "after";
		42;
	};

	is( marker, "before", "spawned task does not block parent" );
	is( await { spawned; }, 42, "spawned task can be awaited" );
	is( marker, "after", "spawned task runs independently" );

	let failed_spawn := spawn {
		throw new Exception( message: "spawn-boom" );
	};
	let observed_failure := false;
	try {
		await {
			failed_spawn;
		};
	}
	catch ( Exception e ) {
		observed_failure := e.to_String() eq "Exception: spawn-boom";
	}
	ok( observed_failure, "spawned failure is observed when awaited" );
}

async function main () {
	await {
		test_basic_tasks();
	};
	await {
		test_timers_yield_and_cancellation();
	};
	await {
		test_combinators();
	};
	await {
		test_channels();
	};
	await {
		test_spawn();
	};
}

await {
	main();
};

done_testing();