from __future__ import annotations
import asyncio
import functools
import logging
import os
import re
import shutil
import time
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, TextIO, Tuple, Type, Union, cast
import numpy as np
import numpy.typing as npt
from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import ASYNCHRONOUS
from nio.client import AsyncClient
from nio.client.async_client import AsyncClientConfig
from nio.responses import JoinedRoomsError
from qslib.plate_setup import PlateSetup
from qslib.machine import FilterDataFilename, Machine, CommandError
from qslib.scpi_commands import AccessLevel, ArgList
log = logging.getLogger("monitor")
LEDSTATUS = re.compile(rb"Temperature:([+\-\d.]+) Current:([+\-\d.]+) Voltage:([+\-\d.]+) JuncTemp:([+\-\d.]+)")
@dataclass
class LEDStatus:
temperature: float
current: float
voltage: float
junctemp: float
@dataclass(frozen=True)
class MatrixConfig:
password: str
user: str
room: str
host: str
encryption: bool = False
@dataclass(frozen=True)
class InfluxConfig:
token: str
org: str
bucket: str
url: str
@dataclass(frozen=True)
class MachineConfig:
password: Union[str, None] = None
name: str = "localhost"
host: str = "localhost"
ssl: bool | None = True
port: str | int | None = None
retries: int = 3
compile: bool = False
@dataclass(frozen=True)
class SyncConfig:
completed_directory: Union[str, None] = None
in_progress_directory: Union[str, None] = None
@dataclass(frozen=True)
class Config:
matrix: Union[MatrixConfig, None] = None
influxdb: Union[InfluxConfig, None] = None
machine: MachineConfig = MachineConfig()
sync: SyncConfig = SyncConfig()
@dataclass
class RunState:
name: Optional[str] = None
stage: Optional[int | str] = None
cycle: Optional[int] = None
step: Optional[int] = None
plate_setup: Optional[PlateSetup] = None
def refresh(self, c: Machine) -> None:
runmsg = ArgList.from_string(c.run_command("RunProgress?"))
name = cast(str, runmsg.opts["RunTitle"])
if name == "-":
self.name = None
else:
self.name = re.sub(r"(<([\w.]+)>)?([^<]+)(</[\w.]+>)?", r"\3", name)
stage = runmsg.opts["Stage"]
if stage == "-":
self.stage = None
else:
self.stage = cast(int, stage)
cycle = runmsg.opts["Cycle"]
if cycle == "-":
self.cycle = None
else:
self.cycle = cast(int, cycle)
step = runmsg.opts["Step"] if self.stage else None
if step == "-":
self.step = None
else:
self.step = cast(Optional[int], step)
if self.name:
try:
self.plate_setup = PlateSetup.from_machine(c)
except CommandError:
self.plate_setup = None
@classmethod
def from_machine(cls: Type[RunState], c: Machine) -> RunState:
n = cls.__new__(cls)
n.refresh(c)
return n
def statemsg(self, timestamp: str) -> str:
s = f'run_state name="{self.name}"'
if self.stage:
s += f",stage={self.stage}i,cycle={self.cycle}i,step={self.step}i"
else:
s += ",stage=0i,cycle=0i,step=0i" s += f" {timestamp}"
return s
@dataclass
class MachineState:
zone_targets: List[float]
zone_controls: List[bool]
cover_target: float
cover_control: bool
drawer: str
def refresh(self, c: Machine) -> None:
targmsg = ArgList.from_string(c.run_command("TBC:SETT?"))
self.cover_target = cast(float, targmsg.opts["Cover"])
self.zone_targets = cast(List[float], [targmsg.opts[f"Zone{i}"] for i in range(1, 7)])
contmsg = ArgList.from_string(c.run_command("TBC:CONT?"))
self.cover_control = cast(bool, contmsg.opts["Cover"])
self.zone_controls = cast(List[bool], [contmsg.opts[f"Zone{i}"] for i in range(1, 7)])
self.drawer = c.run_command("DRAW?")
@classmethod
def from_machine(cls: Type[MachineState], c: Machine) -> MachineState:
n = cast(MachineState, cls.__new__(cls))
n.refresh(c)
return n
@dataclass
class State:
run: RunState
machine: MachineState
@classmethod
def from_machine(cls: Type[State], c: Machine) -> State:
run = RunState.from_machine(c)
machine = MachineState.from_machine(c)
return cls(run, machine)
def index_to_filename_ref(i: Tuple[str, int, int, int, int]) -> str:
x, s, c, t, p = i
return f"S{s:02}_C{c:03}_T{t:02}_P{p:04}_M{x[4]}_X{x[1]}"
def get_runinfo(c: Machine) -> State:
state = State.from_machine(c)
return state
class Collector:
def __init__(self, config: Config):
self.config = config
if self.config.influxdb:
self.idbclient = InfluxDBClient(
url=self.config.influxdb.url,
token=self.config.influxdb.token,
org=self.config.influxdb.org,
)
self.idbw = self.idbclient.write_api(write_options=ASYNCHRONOUS)
else:
self.idbw = None
if self.config.matrix:
self.matrix_config = AsyncClientConfig(encryption_enabled=self.config.matrix.encryption)
self.matrix_client = AsyncClient(
self.config.matrix.host,
self.config.matrix.user,
store_path="./matrix_store/",
config=self.matrix_config,
)
log.info(config.sync)
self.run_log_file: TextIO | None = None
def inject(self, t: str | Iterable[str | Point] | Point, flush: bool = False) -> None:
if self.idbw:
self.idbw.write(bucket=self.config.influxdb.bucket, record=t) if flush:
self.idbw.flush()
else:
pass
async def matrix_announce(self, msg: str) -> None:
assert self.config.matrix
await self.matrix_client.room_send(
room_id=self.config.matrix.room,
message_type="m.room.message",
content={"msgtype": "m.text", "body": msg},
ignore_unverified_devices=True,
)
await self.matrix_client.sync()
def setup_new_rundir(
self,
connection: Machine,
name: str,
*,
firstmsg: str | None = None,
overwrite: bool = False,
) -> None:
assert self.ipdir is not None
if not self.ipdir.is_dir():
log.error(f"Can't open in-progress directory {self.ipdir}.")
return
dirpath = self.ipdir / name
if dirpath.exists() and (not overwrite):
log.error(f"In-progress directory for {name} already exists.")
return
elif dirpath.exists():
assert dirpath != self.ipdir
shutil.rmtree(dirpath)
dirpath.mkdir()
zf = connection.read_dir_as_zip(name, "experiment")
target = dirpath.resolve()
for member in zf.namelist():
member_path = (target / member).resolve()
if not str(member_path).startswith(str(target)):
raise ValueError(f"ZIP member {member!r} would extract outside target directory")
zf.extractall(dirpath)
(dirpath / "apldbio" / "sds" / "quant").mkdir(exist_ok=True)
(dirpath / "apldbio" / "sds" / "filter").mkdir(exist_ok=True)
(dirpath / "apldbio" / "sds" / "calibrations").mkdir(exist_ok=True)
self.run_log_file = (dirpath / "apldbio" / "sds" / "messages.log").open("a")
if firstmsg is not None:
self.run_log_file.write(firstmsg)
self.run_log_file.flush()
@property
def ipdir(self) -> Path | None:
x = self.config.sync.in_progress_directory
if x is None:
return None
else:
return Path(x)
def run_ip_path(self, name: str) -> Path:
if (ipdir := self.ipdir) is None:
raise ValueError
return ipdir / name / "apldbio" / "sds"
def compile_eds(self, connection: Machine, name: str) -> None:
time.sleep(300.0)
try:
connection.set_access_level(AccessLevel.Controller)
connection.compile_eds(name)
except FileNotFoundError as e:
raise e
finally:
connection.set_access_level(AccessLevel.Observer)
def sync_completed(self, connection: Machine, name: str) -> None:
try:
self.compile_eds(connection, name)
except FileNotFoundError:
pass
dir = Path(cast(str, self.config.sync.completed_directory))
if not dir.is_dir():
log.error(f"Can't sync completed EDS to invalid path {dir}.")
return
path = dir / (name + ".eds")
if path.exists():
log.error(f"Completed EDS already exists for {name}.")
return
try:
with path.open("wb") as f:
edsfile = connection.read_file(f"public_run_complete:{name}.eds")
f.write(edsfile)
except Exception as e:
log.error(f"Error synchronizing completed EDS {name}: {e}")
return
if self.ipdir:
import shutil
if (self.ipdir / name).exists():
shutil.rmtree(self.ipdir / name)
if (x := (self.ipdir / (name + ".eds"))).exists():
x.unlink()
def docollect(
self,
args: Dict[str, Union[str, int, bool, float]],
state: State,
connection: Machine,
) -> None:
if state.run.plate_setup:
pa: npt.NDArray[np.object_] | None = state.run.plate_setup.well_samples_as_array()
else:
pa = None
run = cast(str, args["run"])
if run.startswith('"'):
run = run[1:-1]
del args["run"]
for k, v in args.items():
if k != "run":
args[k] = int(v)
pl = [
FilterDataFilename.fromstring(x)
for x in connection.get_expfile_list(
"{run}/apldbio/sds/filter/S{stage:02}_C{cycle:03}_T{step:02}_P{point:04}_*_filterdata.xml".format(
run=run, **cast(Dict[str, int], args)
),
allow_nomatch=True,
)
]
pl.sort()
toget = [x for x in pl if x.is_same_point(pl[-1])]
lp: List[str] = []
files: list[tuple[str, bytes]] = []
if (
self.ipdir
and (
self.ipdir / run / "apldbio" / "sds" / "filter" ).exists()
):
for fdf in toget:
fdr, files_one = connection.get_filterdata_one(fdf, return_files=True)
lp += fdr.to_lineprotocol(run_name=run, sample_array=pa)
files += files_one
else:
for fdf in toget:
lp += (connection.get_filterdata_one(fdf)).to_lineprotocol(run_name=run, sample_array=pa)
self.inject(lp, flush=True)
for path, data in files:
fullpath = self.run_ip_path(run) / path
with fullpath.open("wb") as f:
f.write(data)
if (
self.ipdir
and (
self.ipdir / run / "apldbio" / "sds" / "filter" ).exists()
):
saferun = run ipp = self.ipdir / saferun
with zipfile.ZipFile(self.ipdir / (saferun + ".eds"), "w") as z:
for root, _, zfiles in os.walk(ipp):
for zfile in zfiles:
fpath = os.path.join(root, zfile)
z.write(fpath, os.path.relpath(fpath, ipp))
def handle_run_msg(
self: Collector,
state: State,
c: Machine,
topic: bytes,
message: bytes,
timestamp: float | None,
) -> None:
topic_str = topic.decode()
message_str = message.decode()
if self.run_log_file is not None:
self.run_log_file.write(f"{topic_str} {timestamp} {message_str}")
self.run_log_file.flush()
assert timestamp is not None
timestamp = int(1e9 * timestamp)
msg = ArgList.from_string(message_str)
log.debug(msg)
contents = msg.args
action = cast(str, contents[0])
if action == "Stage":
assert isinstance(contents[1], (str, int))
state.run.stage = contents[1]
self.inject(
Point("run_action")
.tag("type", action.lower())
.field(action.lower(), contents[1])
.time(timestamp)
.to_line_protocol()
)
self.inject(
Point("run_status")
.tag("type", action.lower())
.field(action.lower(), contents[1])
.time(timestamp)
.to_line_protocol()
)
elif action == "Cycle":
state.run.cycle = int(contents[1])
self.inject(
Point("run_action")
.tag("type", action.lower())
.field(action.lower(), contents[1])
.time(timestamp)
.to_line_protocol()
)
self.inject(
Point("run_status")
.tag("type", action.lower())
.field(action.lower(), contents[1])
.time(timestamp)
.to_line_protocol()
)
elif action == "Step":
state.run.step = int(contents[1])
self.inject(
Point("run_status")
.tag("type", action.lower())
.field(action.lower(), contents[1])
.time(timestamp)
.to_line_protocol()
)
self.inject(
Point("run_action")
.tag("type", action.lower())
.field(action.lower(), contents[1])
.time(timestamp)
.to_line_protocol()
)
elif action == "Holding":
self.inject(
f"run_action,type=Holding holdtime={msg.opts['time']} {timestamp}" )
elif action == "Ramping":
state.machine.zone_targets = [
float(x)
for x in cast(list[float], msg.opts["targets"][0]) ]
self.inject(
f'run_action,type={action} run_name="{state.run.name}" {timestamp}' )
elif action == "Acquiring":
self.inject(
f'run_action,type={action} run_name="{state.run.name}" {timestamp}' )
elif action in ["Error", "Ended", "Aborted", "Stopped", "Starting"]:
self.inject(
f'run_action,type={action} run_name="{state.run.name}" {timestamp}' )
asyncio.tasks.create_task(
self.matrix_announce(
f"{self.config.machine.name} status: {action} {' '.join(str(x) for x in contents[1:])}" )
)
if action == "Ended":
self.run_log_file = None
if self.config.machine.compile:
assert state.run.name
compdir = self.config.sync.completed_directory
if compdir != "":
loop = asyncio.get_event_loop()
loop.run_in_executor(None, self.sync_completed, c, state.run.name)
else:
loop = asyncio.get_event_loop()
loop.run_in_executor(None, self.compile_eds, c, state.run.name)
elif action == "Starting":
if self.ipdir:
newname: str = cast(str, contents[1])
newname = newname.strip('"')
loop = asyncio.get_event_loop()
loop.run_in_executor(
None,
functools.partial(
self.setup_new_rundir,
c,
newname,
firstmsg=f"\n{topic_str} {timestamp} {message_str}",
),
)
elif action == "Collected":
self.inject(
f'run_action,type={action} run_name="{state.run.name}" {timestamp}' )
asyncio.tasks.create_task(self.docollect(msg.opts, state, c))
else:
self.inject(
Point("run_action")
.tag("type", "Other")
.tag("run_name", state.run.name)
.field("message", " ".join(str(x) for x in contents))
.time(timestamp)
)
state.run.refresh(c)
state.machine.refresh(c)
log.info(message_str)
self.inject(state.run.statemsg(str(timestamp)))
if state.run.plate_setup:
self.inject(state.run.plate_setup.to_lineprotocol(timestamp, state.run.name))
if self.idbw:
self.idbw.flush()
def handle_led(self, topic: bytes, message: bytes, timestamp: float | None) -> None:
if self.run_log_file is not None:
self.run_log_file.write(f"{topic.decode()} {timestamp} {message.decode()}")
self.run_log_file.flush()
assert timestamp is not None
ls = LEDSTATUS.match(message)
assert ls
p = (
Point("lamp")
.field("temperature", float(ls[1].decode()))
.field("current", float(ls[2].decode()))
.field("voltage", float(ls[3].decode()))
.field("junctemp", float(ls[4].decode()))
.time(int(1e9 * timestamp))
)
self.inject(p, flush=True)
def handle_msg(
self,
state: State,
c: Machine,
topic: bytes,
message: bytes,
timestamp: float | None,
) -> None:
if self.run_log_file is not None:
self.run_log_file.write(f"{topic.decode()} {timestamp} {message.decode()}")
self.run_log_file.flush()
assert timestamp is not None
args = ArgList.from_string(message.decode()).opts
log.debug(f"Handling message {topic.decode()} {message.decode()}")
if topic == b"Temperature":
recs = []
for i, (s, b, t) in enumerate(
zip(
[float(x) for x in cast(list[float], args["sample"][0])], [float(x) for x in cast(list[float], args["block"][0])], state.machine.zone_targets,
)
):
recs.append(
f"temperature,loc=zones,zone={i} sample={s},block={b},target={t} {int(1e9 * timestamp)}" )
recs.append(Point("temperature").tag("loc", "cover").field("cover", args["cover"]))
recs.append(Point("temperature").tag("loc", "heatsink").field("heatsink", args["heatsink"]))
self.inject(recs)
elif topic == b"Time":
p = Point("run_time")
for key in ["elapsed", "remaining", "active"]:
if key in args.keys():
p = p.field(key, args[key])
p.time(int(1e9 * timestamp))
self.inject(p)
if self.idbw:
self.idbw.flush()
async def monitor(self, connected_fut: asyncio.Future[bool] | None = None) -> None:
if self.config.matrix is not None:
await self.matrix_client.login(self.config.matrix.password)
joinedroomresp = await self.matrix_client.joined_rooms()
if isinstance(joinedroomresp, JoinedRoomsError):
log.error(joinedroomresp)
joinedrooms = []
else:
joinedrooms = joinedroomresp.rooms
if self.config.matrix.room not in joinedrooms:
await self.matrix_client.join(self.config.matrix.room)
with Machine(
host=self.config.machine.host,
port=(int(self.config.machine.port) if self.config.machine.port is not None else None),
ssl=self.config.machine.ssl,
password=self.config.machine.password,
) as c:
log.info("monitor connected")
state = get_runinfo(c)
log.info(f"status info: {state}")
self.inject(state.run.statemsg(str(time.time_ns())))
if state.run.plate_setup:
self.inject(state.run.plate_setup.to_lineprotocol(time.time_ns(), state.run.name))
if self.idbw:
self.idbw.flush()
if state.run.name and self.ipdir:
self.setup_new_rundir(c, state.run.name, overwrite=True)
c.run_command("SUBS -timestamp Temperature Time Run LEDStatus")
log.debug("subscriptions made")
for t in [b"Temperature", b"Time"]:
c._protocol.topic_handlers[t] = functools.partial(self.handle_msg, state, c)
c._protocol.topic_handlers[b"Run"] = functools.partial(self.handle_run_msg, state, c)
c._protocol.topic_handlers[b"LEDStatus"] = self.handle_led
log.info(c._protocol.topic_handlers)
if connected_fut is not None:
connected_fut.set_result(True)
log_conn = c.connection.subscribe_log()
ok = True
while ok:
nextlog = next(log_conn)
log.info(f"log: {nextlog}")
if c._protocol.lostconnection.done():
log.error("Lost connection.")
ok = False
if time.time() - c._protocol.last_received <= 60.0:
continue
try:
await c.run_command_bytes_with_timeout(b"ISTAT?", 30)
except TimeoutError:
log.error("No data received in 5 minutes and ISTAT? test timed out. Trying to disconnect.")
c.disconnect()
raise TimeoutError
async def reliable_monitor(self, connected_fut: asyncio.Future[bool] | None = None) -> None:
log.info("starting reconnectable monitoring")
restart = True
successive_failures = 0
while restart:
try:
await self.monitor(connected_fut=connected_fut)
except asyncio.exceptions.TimeoutError as e:
successive_failures = 0
log.warning(f"lost connection with timeout {e}", exc_info=True)
except OSError as e:
log.error(f"connection error {e}, retrying", exc_info=True)
except Exception as e:
if self.config.machine.retries - successive_failures > 0:
log.error(
f"Error {repr(e)}\nRetrying {self.config.machine.retries - successive_failures} times",
exc_info=True,
)
successive_failures += 1
else:
log.critical(f"giving up, error {e}", exc_info=True)
if self.matrix_client and self.matrix_config:
try:
await self.matrix_client.room_send(
room_id=self.matrix_config.room,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": f"Unrecoverable error in QS monitoring (tried 3 times), giving up: {e}, {e.__traceback__}",
},
)
except Exception as matrix_e:
log.error(f"Failed to send Matrix message: {matrix_e}")
restart = False
log.debug("awaiting retry")
await asyncio.sleep(30)