from std/eval import eval;
from std/task import sleep;
from test/more import *;
from test/worker_contract import
await_cancelled,
await_worker_failure,
require_worker;
from test/worker_phase5/functions import worker_phase5_imported_add;
let Worker := require_worker();
const worker_suite_offset := 10;
function worker_suite_add_offset ( x ) {
return x + worker_suite_offset;
}
trait WorkerSuiteLabelled {
method label () -> String {
return "trait:" _ self.get_name();
}
}
class WorkerSuiteThing with WorkerSuiteLabelled {
let String name with get := "Ada";
}
class WorkerSuiteBox {
let Number value with get, set := 0;
method label ( String prefix ) -> String {
return prefix _ ":" _ value;
}
}
class WorkerSuiteReturnBox {
let String name with get, set := "unset";
method label () -> String {
return "box:" _ name;
}
}
async function test_spawn_values () {
ok( Worker can "spawn", "Worker exposes spawn" );
let scalar_task := Worker.spawn(
function ( x ) {
return x * 2;
},
[ 21 ],
);
is( typeof scalar_task, "Task", "Worker.spawn returns a Task" );
is( await { scalar_task; }, 42, "worker returns scalar result" );
let collection := await {
Worker.spawn(
function () {
return {
array: [ 1, 2, 3 ],
dict: { name: "Ada", score: 42 },
set: << "red", "blue", "red" >>,
bag: <<< 1, 1, 2 >>>,
};
},
[],
);
};
is( collection{array}, [ 1, 2, 3 ], "worker returns array" );
is( collection{dict}{name}, "Ada", "worker returns dict field" );
is( collection{set}.length(), 2, "worker returns deduplicated set" );
is( collection{bag}.count(1), 2, "worker returns bag counts" );
let data := [ 1, 2 ];
let copy_task := Worker.spawn(
function ( copy ) {
return [ copy[0], copy.length(), copy.get( 2, "missing" ) ];
},
[ data ],
);
data[0] := 9;
data.push(3);
is(
await { copy_task; },
[ 1, 2, "missing" ],
"worker receives a spawn-time copy",
);
}
async function test_spawn_code_and_objects () {
let callback_result := await {
Worker.spawn(
function ( callback, value ) {
return callback(value) + 1;
},
[ worker_suite_add_offset, 31 ],
);
};
is(
callback_result,
42,
"worker receives and calls marshalled user function",
);
is(
await { Worker.spawn( worker_phase5_imported_add, [ 36 ] ); },
42,
"worker receives and calls imported function",
);
let box := new WorkerSuiteBox( value: 41 );
let box_result := await {
Worker.spawn(
function ( copy ) {
copy.set_value( copy.get_value() + 1 );
return [ copy.label("worker"), copy.get_value() ];
},
[ box ],
);
};
is( box_result, [ "worker:42", 42 ], "worker calls copied object methods" );
is( box.get_value(), 41, "worker object mutation stays isolated" );
is(
await { Worker.spawn( box{"label"}, [ "bound" ] ); },
"bound:41",
"worker receives bound method callable",
);
let class_result := await {
Worker.spawn(
function ( klass ) {
let obj := new klass();
return [ typeof obj, obj.label() ];
},
[ WorkerSuiteThing ],
);
};
is( class_result[0], "WorkerSuiteThing", "worker reconstructs user class" );
is( class_result[1], "trait:Ada", "worker reconstructs trait dependency" );
let returned_box := await {
Worker.spawn(
function ( name ) {
return new WorkerSuiteReturnBox( name: name );
},
[ "Ada" ],
);
};
is( typeof returned_box, "WorkerSuiteReturnBox", "parent receives object" );
is( returned_box.label(), "box:Ada", "returned object method works" );
}
async function test_failures_policy_and_result () {
let err := await {
await_worker_failure(
Worker.spawn(
function () {
die "worker-boom";
},
[],
),
);
};
ok( err instanceof Exception, "worker failure is catchable" );
like( err.to_String(), /worker-boom/, "worker failure includes message" );
let denials := await {
Worker.spawn(
function () {
return [
__system__{deny_fs},
__system__{deny_net},
__system__{deny_perl},
__system__{deny_js},
__system__{deny_proc},
__system__{deny_db},
__system__{deny_clib},
__system__{deny_gui},
__system__{deny_worker},
];
},
[],
deny_fs: true,
deny_net: true,
deny_perl: true,
deny_js: true,
deny_proc: true,
deny_db: true,
deny_clib: true,
deny_gui: true,
deny_worker: true,
);
};
is(
denials,
[ true, true, true, true, true, true, true, true, true ],
"worker deny flags are reflected in __system__",
);
let import_error := exception( function () {
eval( "from std/worker import Worker; true;", deny_worker: true );
} );
ok(
import_error instanceof Exception,
"deny_worker prevents std/worker imports",
);
like(
import_error.to_String(),
/(denied|unavailable|not found|worker)/i,
"deny_worker import failure mentions worker denial",
);
let denied_err := await {
await_worker_failure(
Worker.spawn(
function () {
from std/io import Path;
return new Path("not-read.txt").slurp_utf8();
},
[],
deny_fs: true,
),
);
};
like(
denied_err.to_String(),
/(denied|unavailable|std\/io|fs)/i,
"deny_fs blocks std/io in worker",
);
let ok_result := await {
Worker.spawn(
function ( value ) {
from std/result import Result;
return Result.ok(value + 1);
},
[ 41 ],
);
};
is( typeof ok_result, "Result", "worker may return Result.ok" );
is( ok_result.unwrap(), 42, "worker Result.ok unwraps in parent" );
let err_result := await {
Worker.spawn(
function () {
from std/result import Result;
return Result.err("worker-result-error");
},
[],
);
};
is( err_result.unwrap_err(), "worker-result-error", "Result.err survives" );
}
async function test_worker_handle () {
ok( Worker can "spawn_handle", "Worker exposes spawn_handle" );
let handle := Worker.spawn_handle(
async function ( inbox, prefix ) {
let first := await {
inbox.recv();
};
let second := await {
inbox.recv();
};
await {
inbox.send( prefix _ ":" _ second );
};
await {
inbox.send( prefix _ ":" _ first );
};
inbox.close();
return [ first, second ];
},
[ "echo" ],
);
await {
handle.send("one");
};
await {
handle.send("two");
};
is( await { handle.recv(); }, "echo:two", "handle receives first reply" );
is( await { handle.recv(); }, "echo:one", "handle receives second reply" );
is(
await { handle.result(); },
[ "one", "two" ],
"worker sees parent messages in FIFO order",
);
ok( handle.done(), "handle done delegates to result task" );
is( handle.status(), "fulfilled", "handle status delegates to result task" );
let send_error := null;
try {
await {
handle.send("late");
};
}
catch ( Exception e ) {
send_error := e;
}
let send_error_text := send_error ≡ null ? "" : send_error.to_String();
like(
send_error_text,
/ChannelClosedException/,
"send after worker close rejects",
);
let waiting := Worker.spawn_handle(
async function ( inbox ) {
await {
inbox.recv();
};
return "unexpected";
},
[],
);
waiting.cancel("stop");
await {
await_cancelled( waiting.result() );
};
ok( waiting.done(), "cancelled handle is done" );
}
async function test_cancellation () {
let task := Worker.spawn(
async function () {
from std/task import yield;
let i := 0;
while ( i < 1000000 ) {
i := i + 1;
if ( i mod 100 = 0 ) {
await {
yield();
};
}
}
return "finished";
},
[],
);
await {
sleep(0.02);
};
task.cancel("checkpoint");
let err := await {
await_cancelled(task);
};
like(
err.to_String(),
/checkpoint/,
"worker observes cancellation at yield checkpoint",
);
}
async function __main__ (args) {
await {
test_spawn_values();
};
await {
test_spawn_code_and_objects();
};
await {
test_failures_policy_and_result();
};
await {
test_worker_handle();
};
await {
test_cancellation();
};
done_testing();
}