from test/more import *;
from std/task import
Channel,
CancellationSource,
all,
failed,
race,
resolved,
sleep,
timeout,
yield;
let yield_log := "";
async function answer_after_yield () {
await {
yield();
};
return 42;
}
async function double_after_sleep ( n ) {
await {
sleep(0.01);
};
return n * 2;
}
async function fail_after_yield () {
await {
yield();
};
throw new Exception( message: "async-boom" );
}
async function yield_checkpoint () {
yield_log := yield_log _ "before";
await {
yield();
};
yield_log := yield_log _ ",after";
return yield_log;
}
async function test_basic_tasks () {
let task := answer_after_yield();
is( typeof task, "Task", "async function call returns Task" );
is( task.done(), false, "new async task is not immediately done" );
is( await { task; }, 42, "async task can be awaited" );
ok( task.done(), "awaited task is done" );
is( task.status(), "fulfilled", "awaited task is fulfilled" );
let immediate := resolved("ready");
ok( immediate.poll(), "resolved task polls as complete" );
is( await { immediate; }, "ready", "resolved task awaits value" );
let failure := false;
try {
await {
failed("task-boom");
};
}
catch ( Exception e ) {
failure := e.to_String() eq "Exception: task-boom";
}
ok( failure, "failed task throws when awaited" );
let thrown := false;
try {
await {
fail_after_yield();
};
}
catch ( Exception e ) {
thrown := e.to_String() eq "Exception: async-boom";
}
ok( thrown, "async exception propagates through task" );
}
async function test_timers_yield_and_cancellation () {
is(
await {
yield_checkpoint();
},
"before,after",
"yield can be awaited and resumes the task",
);
let sleeping := sleep(1);
is( sleeping.status(), "sleeping", "sleep task reports sleeping" );
sleeping.cancel("stop");
is( sleeping.status(), "cancelled", "cancel changes task status" );
ok( sleeping.done(), "cancelled task is done" );
let cancelled := false;
try {
await {
sleeping;
};
}
catch ( CancelledException e ) {
cancelled := e.to_String() eq "CancelledException: stop";
}
ok( cancelled, "awaiting cancelled task throws reason" );
let source := new CancellationSource();
let token := source.token();
let watched := sleep(1);
token.watch(watched);
source.cancel("token stop");
ok( source.cancelled(), "cancellation source records cancellation" );
ok( token.cancelled(), "cancellation token records cancellation" );
let token_thrown := false;
try {
token.throw_if_cancelled();
}
catch ( CancelledException e ) {
token_thrown := e.to_String() eq "CancelledException: token stop";
}
ok( token_thrown, "token throws stored cancellation reason" );
let watched_cancelled := false;
try {
await {
watched;
};
}
catch ( CancelledException e ) {
watched_cancelled :=
e.to_String() eq "CancelledException: token stop";
}
ok( watched_cancelled, "token cancels watched tasks" );
}
async function test_combinators () {
let values := await {
all( [
resolved("a"),
double_after_sleep(21),
answer_after_yield(),
] );
};
is( values, [ "a", 42, 42 ], "all preserves ordered results" );
let all_loser := sleep(1);
let all_failed := all( [ failed("all-boom"), all_loser ] );
let all_rejected := false;
try {
await {
all_failed;
};
}
catch ( Exception e ) {
all_rejected := e.to_String() eq "Exception: all-boom";
}
ok( all_rejected, "all propagates child rejection" );
is(
all_loser.status(),
"cancelled",
"all cancels unfinished tasks after rejection",
);
let race_loser := sleep(1);
is(
await {
race( [ resolved("winner"), race_loser ] );
},
"winner",
"race returns first resolved task",
);
is( race_loser.status(), "cancelled", "race cancels losing task" );
let timed_target := new Channel().recv();
let timed_out := false;
try {
await {
timeout( 0.01, timed_target );
};
}
catch ( TimeoutException e ) {
timed_out := true;
}
ok( timed_out, "timeout throws TimeoutException" );
is( timed_target.status(), "cancelled", "timeout cancels wrapped task" );
}
async function test_channels () {
let ch := new Channel();
await {
ch.send("first");
};
await {
ch.send("second");
};
is( await { ch.recv(); }, "first", "channel receives first FIFO item" );
is( await { ch.recv(); }, "second", "channel receives second FIFO item" );
let pending := ch.recv();
await {
ch.send("later");
};
is( await { pending; }, "later", "pending recv completes after send" );
ch.close();
is( await { ch.recv(); }, null, "closed drained channel recv returns null" );
let send_closed := false;
try {
await {
ch.send("too late");
};
}
catch ( ChannelClosedException e ) {
send_closed := true;
}
ok( send_closed, "send after close throws ChannelClosedException" );
}
async function test_spawn () {
let marker := "before";
let spawned := spawn {
await {
sleep(0.01);
};
marker := "after";
42;
};
is( marker, "before", "spawned task does not block parent" );
is( await { spawned; }, 42, "spawned task can be awaited" );
is( marker, "after", "spawned task runs independently" );
let failed_spawn := spawn {
throw new Exception( message: "spawn-boom" );
};
let observed_failure := false;
try {
await {
failed_spawn;
};
}
catch ( Exception e ) {
observed_failure := e.to_String() eq "Exception: spawn-boom";
}
ok( observed_failure, "spawned failure is observed when awaited" );
}
async function main () {
await {
test_basic_tasks();
};
await {
test_timers_yield_and_cancellation();
};
await {
test_combinators();
};
await {
test_channels();
};
await {
test_spawn();
};
}
await {
main();
};
done_testing();