__version__ = "2.60"
import binascii, errno, random, re, socket, subprocess, sys, time, calendar
from datetime import datetime, timezone, timedelta
from io import DEFAULT_BUFFER_SIZE
try:
import ssl
HAVE_SSL = True
except ImportError:
HAVE_SSL = False
__all__ = ["IMAP4", "IMAP4_stream", "Internaldate2tuple",
"Int2AP", "ParseFlags", "Time2Internaldate"]
CRLF = b'\r\n'
Debug = 0
IMAP4_PORT = 143
IMAP4_SSL_PORT = 993
AllowedVersions = ('IMAP4REV1', 'IMAP4')
_MAXLINE = 1000000
Commands = {
'APPEND': ('AUTH', 'SELECTED'),
'AUTHENTICATE': ('NONAUTH',),
'CAPABILITY': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
'CHECK': ('SELECTED',),
'CLOSE': ('SELECTED',),
'COPY': ('SELECTED',),
'CREATE': ('AUTH', 'SELECTED'),
'DELETE': ('AUTH', 'SELECTED'),
'DELETEACL': ('AUTH', 'SELECTED'),
'ENABLE': ('AUTH', ),
'EXAMINE': ('AUTH', 'SELECTED'),
'EXPUNGE': ('SELECTED',),
'FETCH': ('SELECTED',),
'GETACL': ('AUTH', 'SELECTED'),
'GETANNOTATION':('AUTH', 'SELECTED'),
'GETQUOTA': ('AUTH', 'SELECTED'),
'GETQUOTAROOT': ('AUTH', 'SELECTED'),
'IDLE': ('AUTH', 'SELECTED'),
'MYRIGHTS': ('AUTH', 'SELECTED'),
'LIST': ('AUTH', 'SELECTED'),
'LOGIN': ('NONAUTH',),
'LOGOUT': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
'LSUB': ('AUTH', 'SELECTED'),
'MOVE': ('SELECTED',),
'NAMESPACE': ('AUTH', 'SELECTED'),
'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
'PARTIAL': ('SELECTED',), 'PROXYAUTH': ('AUTH',),
'RENAME': ('AUTH', 'SELECTED'),
'SEARCH': ('SELECTED',),
'SELECT': ('AUTH', 'SELECTED'),
'SETACL': ('AUTH', 'SELECTED'),
'SETANNOTATION':('AUTH', 'SELECTED'),
'SETQUOTA': ('AUTH', 'SELECTED'),
'SORT': ('SELECTED',),
'STARTTLS': ('NONAUTH',),
'STATUS': ('AUTH', 'SELECTED'),
'STORE': ('SELECTED',),
'SUBSCRIBE': ('AUTH', 'SELECTED'),
'THREAD': ('SELECTED',),
'UID': ('SELECTED',),
'UNSUBSCRIBE': ('AUTH', 'SELECTED'),
'UNSELECT': ('SELECTED',),
}
Continuation = re.compile(br'\+( (?P<data>.*))?')
Flags = re.compile(br'.*FLAGS \((?P<flags>[^\)]*)\)')
InternalDate = re.compile(br'.*INTERNALDATE "'
br'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
br' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
br' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
br'"')
Literal = re.compile(br'.*{(?P<size>\d+)}$', re.ASCII)
MapCRLF = re.compile(br'\r\n|\r|\n')
Response_code = re.compile(br'\[(?P<type>[A-Z-]+)( (?P<data>.*))?\]')
Untagged_response = re.compile(br'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
Untagged_status = re.compile(
br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?', re.ASCII)
_Literal = br'.*{(?P<size>\d+)}$'
_Untagged_status = br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?'
class IMAP4:
class error(Exception): pass class abort(error): pass class readonly(abort): pass class _responsetimeout(TimeoutError): pass
def __init__(self, host='', port=IMAP4_PORT, timeout=None):
self.debug = Debug
self.state = 'LOGOUT'
self.literal = None self.tagged_commands = {} self.untagged_responses = {} self.continuation_response = '' self._idle_responses = [] self._idle_capture = False self.is_readonly = False self.tagnum = 0
self._tls_established = False
self._mode_ascii()
self._readbuf = []
self.open(host, port, timeout)
try:
self._connect()
except Exception:
try:
self.shutdown()
except OSError:
pass
raise
def _mode_ascii(self):
self.utf8_enabled = False
self._encoding = 'ascii'
self.Literal = re.compile(_Literal, re.ASCII)
self.Untagged_status = re.compile(_Untagged_status, re.ASCII)
def _mode_utf8(self):
self.utf8_enabled = True
self._encoding = 'utf-8'
self.Literal = re.compile(_Literal)
self.Untagged_status = re.compile(_Untagged_status)
def _connect(self):
self.tagpre = Int2AP(random.randint(4096, 65535))
self.tagre = re.compile(br'(?P<tag>'
+ self.tagpre
+ br'\d+) (?P<type>[A-Z]+) (?P<data>.*)', re.ASCII)
if __debug__:
self._cmd_log_len = 10
self._cmd_log_idx = 0
self._cmd_log = {} if self.debug >= 1:
self._mesg('imaplib version %s' % __version__)
self._mesg('new IMAP4 connection, tag=%s' % self.tagpre)
self.welcome = self._get_response()
if 'PREAUTH' in self.untagged_responses:
self.state = 'AUTH'
elif 'OK' in self.untagged_responses:
self.state = 'NONAUTH'
else:
raise self.error(self.welcome)
self._get_capabilities()
if __debug__:
if self.debug >= 3:
self._mesg('CAPABILITIES: %r' % (self.capabilities,))
for version in AllowedVersions:
if not version in self.capabilities:
continue
self.PROTOCOL_VERSION = version
return
raise self.error('server not IMAP4 compliant')
def __getattr__(self, attr):
if attr in Commands:
return getattr(self, attr.lower())
raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
def __enter__(self):
return self
def __exit__(self, *args):
if self.state == "LOGOUT":
return
try:
self.logout()
except OSError:
pass
def _create_socket(self, timeout):
if timeout is not None and not timeout:
raise ValueError('Non-blocking socket (timeout=0) is not supported')
host = None if not self.host else self.host
sys.audit("imaplib.open", self, self.host, self.port)
address = (host, self.port)
if timeout is not None:
return socket.create_connection(address, timeout)
return socket.create_connection(address)
def open(self, host='', port=IMAP4_PORT, timeout=None):
self.host = host
self.port = port
self.sock = self._create_socket(timeout)
self._file = self.sock.makefile('rb')
@property
def file(self):
import warnings
warnings.warn(
'IMAP4.file is unsupported, can cause errors, and may be removed.',
RuntimeWarning,
stacklevel=2)
return self._file
def read(self, size):
parts = []
while size > 0:
if len(parts) < len(self._readbuf):
buf = self._readbuf[len(parts)]
else:
try:
buf = self.sock.recv(DEFAULT_BUFFER_SIZE)
except ConnectionError:
break
if not buf:
break
self._readbuf.append(buf)
if len(buf) >= size:
parts.append(buf[:size])
self._readbuf = [buf[size:]] + self._readbuf[len(parts):]
break
parts.append(buf)
size -= len(buf)
return b''.join(parts)
def readline(self):
LF = b'\n'
parts = []
length = 0
while length < _MAXLINE:
if len(parts) < len(self._readbuf):
buf = self._readbuf[len(parts)]
else:
try:
buf = self.sock.recv(DEFAULT_BUFFER_SIZE)
except ConnectionError:
break
if not buf:
break
self._readbuf.append(buf)
pos = buf.find(LF)
if pos != -1:
pos += 1
parts.append(buf[:pos])
self._readbuf = [buf[pos:]] + self._readbuf[len(parts):]
break
parts.append(buf)
length += len(buf)
line = b''.join(parts)
if len(line) > _MAXLINE:
raise self.error("got more than %d bytes" % _MAXLINE)
return line
def send(self, data):
sys.audit("imaplib.send", self, data)
self.sock.sendall(data)
def shutdown(self):
self._file.close()
try:
self.sock.shutdown(socket.SHUT_RDWR)
except OSError as exc:
if (exc.errno != errno.ENOTCONN
and getattr(exc, 'winerror', 0) != 10022):
raise
finally:
self.sock.close()
def socket(self):
return self.sock
def recent(self):
name = 'RECENT'
typ, dat = self._untagged_response('OK', [None], name)
if dat[-1]:
return typ, dat
typ, dat = self.noop() return self._untagged_response(typ, dat, name)
def response(self, code):
return self._untagged_response(code, [None], code.upper())
def append(self, mailbox, flags, date_time, message):
name = 'APPEND'
if not mailbox:
mailbox = 'INBOX'
if flags:
if (flags[0],flags[-1]) != ('(',')'):
flags = '(%s)' % flags
else:
flags = None
if date_time:
date_time = Time2Internaldate(date_time)
else:
date_time = None
literal = MapCRLF.sub(CRLF, message)
self.literal = literal
return self._simple_command(name, mailbox, flags, date_time)
def authenticate(self, mechanism, authobject):
mech = mechanism.upper()
self.literal = _Authenticator(authobject).process
typ, dat = self._simple_command('AUTHENTICATE', mech)
if typ != 'OK':
raise self.error(dat[-1].decode('utf-8', 'replace'))
self.state = 'AUTH'
return typ, dat
def capability(self):
name = 'CAPABILITY'
typ, dat = self._simple_command(name)
return self._untagged_response(typ, dat, name)
def check(self):
return self._simple_command('CHECK')
def close(self):
try:
typ, dat = self._simple_command('CLOSE')
finally:
self.state = 'AUTH'
return typ, dat
def copy(self, message_set, new_mailbox):
return self._simple_command('COPY', message_set, new_mailbox)
def create(self, mailbox):
return self._simple_command('CREATE', mailbox)
def delete(self, mailbox):
return self._simple_command('DELETE', mailbox)
def deleteacl(self, mailbox, who):
return self._simple_command('DELETEACL', mailbox, who)
def enable(self, capability):
if 'ENABLE' not in self.capabilities:
raise IMAP4.error("Server does not support ENABLE")
typ, data = self._simple_command('ENABLE', capability)
if typ == 'OK' and 'UTF8=ACCEPT' in capability.upper():
self._mode_utf8()
return typ, data
def expunge(self):
name = 'EXPUNGE'
typ, dat = self._simple_command(name)
return self._untagged_response(typ, dat, name)
def fetch(self, message_set, message_parts):
name = 'FETCH'
typ, dat = self._simple_command(name, message_set, message_parts)
return self._untagged_response(typ, dat, name)
def getacl(self, mailbox):
typ, dat = self._simple_command('GETACL', mailbox)
return self._untagged_response(typ, dat, 'ACL')
def getannotation(self, mailbox, entry, attribute):
typ, dat = self._simple_command('GETANNOTATION', mailbox, entry, attribute)
return self._untagged_response(typ, dat, 'ANNOTATION')
def getquota(self, root):
typ, dat = self._simple_command('GETQUOTA', root)
return self._untagged_response(typ, dat, 'QUOTA')
def getquotaroot(self, mailbox):
typ, dat = self._simple_command('GETQUOTAROOT', mailbox)
typ, quota = self._untagged_response(typ, dat, 'QUOTA')
typ, quotaroot = self._untagged_response(typ, dat, 'QUOTAROOT')
return typ, [quotaroot, quota]
def idle(self, duration=None):
return Idler(self, duration)
def list(self, directory='""', pattern='*'):
name = 'LIST'
typ, dat = self._simple_command(name, directory, pattern)
return self._untagged_response(typ, dat, name)
def login(self, user, password):
typ, dat = self._simple_command('LOGIN', user, self._quote(password))
if typ != 'OK':
raise self.error(dat[-1])
self.state = 'AUTH'
return typ, dat
def login_cram_md5(self, user, password):
self.user, self.password = user, password
return self.authenticate('CRAM-MD5', self._CRAM_MD5_AUTH)
def _CRAM_MD5_AUTH(self, challenge):
import hmac
if isinstance(self.password, str):
password = self.password.encode('utf-8')
else:
password = self.password
try:
authcode = hmac.HMAC(password, challenge, 'md5')
except ValueError: raise self.error("CRAM-MD5 authentication is not supported")
return f"{self.user} {authcode.hexdigest()}"
def logout(self):
self.state = 'LOGOUT'
typ, dat = self._simple_command('LOGOUT')
self.shutdown()
return typ, dat
def lsub(self, directory='""', pattern='*'):
name = 'LSUB'
typ, dat = self._simple_command(name, directory, pattern)
return self._untagged_response(typ, dat, name)
def myrights(self, mailbox):
typ,dat = self._simple_command('MYRIGHTS', mailbox)
return self._untagged_response(typ, dat, 'MYRIGHTS')
def namespace(self):
name = 'NAMESPACE'
typ, dat = self._simple_command(name)
return self._untagged_response(typ, dat, name)
def noop(self):
if __debug__:
if self.debug >= 3:
self._dump_ur(self.untagged_responses)
return self._simple_command('NOOP')
def partial(self, message_num, message_part, start, length):
name = 'PARTIAL'
typ, dat = self._simple_command(name, message_num, message_part, start, length)
return self._untagged_response(typ, dat, 'FETCH')
def proxyauth(self, user):
name = 'PROXYAUTH'
return self._simple_command('PROXYAUTH', user)
def rename(self, oldmailbox, newmailbox):
return self._simple_command('RENAME', oldmailbox, newmailbox)
def search(self, charset, *criteria):
name = 'SEARCH'
if charset:
if self.utf8_enabled:
raise IMAP4.error("Non-None charset not valid in UTF8 mode")
typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria)
else:
typ, dat = self._simple_command(name, *criteria)
return self._untagged_response(typ, dat, name)
def select(self, mailbox='INBOX', readonly=False):
self.untagged_responses = {} self.is_readonly = readonly
if readonly:
name = 'EXAMINE'
else:
name = 'SELECT'
typ, dat = self._simple_command(name, mailbox)
if typ != 'OK':
self.state = 'AUTH' return typ, dat
self.state = 'SELECTED'
if 'READ-ONLY' in self.untagged_responses \
and not readonly:
if __debug__:
if self.debug >= 1:
self._dump_ur(self.untagged_responses)
raise self.readonly('%s is not writable' % mailbox)
return typ, self.untagged_responses.get('EXISTS', [None])
def setacl(self, mailbox, who, what):
return self._simple_command('SETACL', mailbox, who, what)
def setannotation(self, *args):
typ, dat = self._simple_command('SETANNOTATION', *args)
return self._untagged_response(typ, dat, 'ANNOTATION')
def setquota(self, root, limits):
typ, dat = self._simple_command('SETQUOTA', root, limits)
return self._untagged_response(typ, dat, 'QUOTA')
def sort(self, sort_criteria, charset, *search_criteria):
name = 'SORT'
if (sort_criteria[0],sort_criteria[-1]) != ('(',')'):
sort_criteria = '(%s)' % sort_criteria
typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria)
return self._untagged_response(typ, dat, name)
def starttls(self, ssl_context=None):
name = 'STARTTLS'
if not HAVE_SSL:
raise self.error('SSL support missing')
if self._tls_established:
raise self.abort('TLS session already established')
if name not in self.capabilities:
raise self.abort('TLS not supported by server')
if ssl_context is None:
ssl_context = ssl._create_stdlib_context()
typ, dat = self._simple_command(name)
if typ == 'OK':
self.sock = ssl_context.wrap_socket(self.sock,
server_hostname=self.host)
self._file = self.sock.makefile('rb')
self._tls_established = True
self._get_capabilities()
else:
raise self.error("Couldn't establish TLS session")
return self._untagged_response(typ, dat, name)
def status(self, mailbox, names):
name = 'STATUS'
typ, dat = self._simple_command(name, mailbox, names)
return self._untagged_response(typ, dat, name)
def store(self, message_set, command, flags):
if (flags[0],flags[-1]) != ('(',')'):
flags = '(%s)' % flags typ, dat = self._simple_command('STORE', message_set, command, flags)
return self._untagged_response(typ, dat, 'FETCH')
def subscribe(self, mailbox):
return self._simple_command('SUBSCRIBE', mailbox)
def thread(self, threading_algorithm, charset, *search_criteria):
name = 'THREAD'
typ, dat = self._simple_command(name, threading_algorithm, charset, *search_criteria)
return self._untagged_response(typ, dat, name)
def uid(self, command, *args):
command = command.upper()
if not command in Commands:
raise self.error("Unknown IMAP4 UID command: %s" % command)
if self.state not in Commands[command]:
raise self.error("command %s illegal in state %s, "
"only allowed in states %s" %
(command, self.state,
', '.join(Commands[command])))
name = 'UID'
typ, dat = self._simple_command(name, command, *args)
if command in ('SEARCH', 'SORT', 'THREAD'):
name = command
else:
name = 'FETCH'
return self._untagged_response(typ, dat, name)
def unsubscribe(self, mailbox):
return self._simple_command('UNSUBSCRIBE', mailbox)
def unselect(self):
try:
typ, data = self._simple_command('UNSELECT')
finally:
self.state = 'AUTH'
return typ, data
def xatom(self, name, *args):
name = name.upper()
if not name in Commands:
Commands[name] = (self.state,)
return self._simple_command(name, *args)
def _append_untagged(self, typ, dat):
if dat is None:
dat = b''
if self._idle_capture:
if (not self._idle_responses or
isinstance(self._idle_responses[-1][1][-1], bytes)):
self._idle_responses.append((typ, [dat]))
else:
response = self._idle_responses[-1]
assert response[0] == typ
response[1].append(dat)
if __debug__ and self.debug >= 5:
self._mesg(f'idle: queue untagged {typ} {dat!r}')
return
ur = self.untagged_responses
if __debug__:
if self.debug >= 5:
self._mesg('untagged_responses[%s] %s += ["%r"]' %
(typ, len(ur.get(typ,'')), dat))
if typ in ur:
ur[typ].append(dat)
else:
ur[typ] = [dat]
def _check_bye(self):
bye = self.untagged_responses.get('BYE')
if bye:
raise self.abort(bye[-1].decode(self._encoding, 'replace'))
def _command(self, name, *args):
if self.state not in Commands[name]:
self.literal = None
raise self.error("command %s illegal in state %s, "
"only allowed in states %s" %
(name, self.state,
', '.join(Commands[name])))
for typ in ('OK', 'NO', 'BAD'):
if typ in self.untagged_responses:
del self.untagged_responses[typ]
if 'READ-ONLY' in self.untagged_responses \
and not self.is_readonly:
raise self.readonly('mailbox status changed to READ-ONLY')
tag = self._new_tag()
name = bytes(name, self._encoding)
data = tag + b' ' + name
for arg in args:
if arg is None: continue
if isinstance(arg, str):
arg = bytes(arg, self._encoding)
data = data + b' ' + arg
literal = self.literal
if literal is not None:
self.literal = None
if type(literal) is type(self._command):
literator = literal
else:
literator = None
if self.utf8_enabled:
data = data + bytes(' UTF8 (~{%s}' % len(literal), self._encoding)
literal = literal + b')'
else:
data = data + bytes(' {%s}' % len(literal), self._encoding)
if __debug__:
if self.debug >= 4:
self._mesg('> %r' % data)
else:
self._log('> %r' % data)
try:
self.send(data + CRLF)
except OSError as val:
raise self.abort('socket error: %s' % val)
if literal is None:
return tag
while 1:
while self._get_response():
if self.tagged_commands[tag]: return tag
if literator:
literal = literator(self.continuation_response)
if __debug__:
if self.debug >= 4:
self._mesg('write literal size %s' % len(literal))
try:
self.send(literal)
self.send(CRLF)
except OSError as val:
raise self.abort('socket error: %s' % val)
if not literator:
break
return tag
def _command_complete(self, name, tag):
logout = (name == 'LOGOUT')
if not logout:
self._check_bye()
try:
typ, data = self._get_tagged_response(tag, expect_bye=logout)
except self.abort as val:
raise self.abort('command: %s => %s' % (name, val))
except self.error as val:
raise self.error('command: %s => %s' % (name, val))
if not logout:
self._check_bye()
if typ == 'BAD':
raise self.error('%s command error: %s %s' % (name, typ, data))
return typ, data
def _get_capabilities(self):
typ, dat = self.capability()
if dat == [None]:
raise self.error('no CAPABILITY response from server')
dat = str(dat[-1], self._encoding)
dat = dat.upper()
self.capabilities = tuple(dat.split())
def _get_response(self, start_timeout=False):
if start_timeout is not False and self.sock:
assert start_timeout is None or start_timeout > 0
saved_timeout = self.sock.gettimeout()
self.sock.settimeout(start_timeout)
try:
resp = self._get_line()
except TimeoutError as err:
raise self._responsetimeout from err
finally:
self.sock.settimeout(saved_timeout)
else:
resp = self._get_line()
if self._match(self.tagre, resp):
tag = self.mo.group('tag')
if not tag in self.tagged_commands:
raise self.abort('unexpected tagged response: %r' % resp)
typ = self.mo.group('type')
typ = str(typ, self._encoding)
dat = self.mo.group('data')
self.tagged_commands[tag] = (typ, [dat])
else:
dat2 = None
if not self._match(Untagged_response, resp):
if self._match(self.Untagged_status, resp):
dat2 = self.mo.group('data2')
if self.mo is None:
if self._match(Continuation, resp):
self.continuation_response = self.mo.group('data')
return None
raise self.abort("unexpected response: %r" % resp)
typ = self.mo.group('type')
typ = str(typ, self._encoding)
dat = self.mo.group('data')
if dat is None: dat = b'' if dat2: dat = dat + b' ' + dat2
while self._match(self.Literal, dat):
size = int(self.mo.group('size'))
if __debug__:
if self.debug >= 4:
self._mesg('read literal size %s' % size)
data = self.read(size)
self._append_untagged(typ, (dat, data))
dat = self._get_line()
self._append_untagged(typ, dat)
if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):
typ = self.mo.group('type')
typ = str(typ, self._encoding)
self._append_untagged(typ, self.mo.group('data'))
if __debug__:
if self.debug >= 1 and typ in ('NO', 'BAD', 'BYE'):
self._mesg('%s response: %r' % (typ, dat))
return resp
def _get_tagged_response(self, tag, expect_bye=False):
while 1:
result = self.tagged_commands[tag]
if result is not None:
del self.tagged_commands[tag]
return result
if expect_bye:
typ = 'BYE'
bye = self.untagged_responses.pop(typ, None)
if bye is not None:
return (typ, bye)
self._check_bye()
try:
self._get_response()
except self.abort as val:
if __debug__:
if self.debug >= 1:
self.print_log()
raise
def _get_line(self):
line = self.readline()
if not line:
raise self.abort('socket error: EOF')
if not line.endswith(b'\r\n'):
raise self.abort('socket error: unterminated line: %r' % line)
line = line[:-2]
if __debug__:
if self.debug >= 4:
self._mesg('< %r' % line)
else:
self._log('< %r' % line)
return line
def _match(self, cre, s):
self.mo = cre.match(s)
if __debug__:
if self.mo is not None and self.debug >= 5:
self._mesg("\tmatched %r => %r" % (cre.pattern, self.mo.groups()))
return self.mo is not None
def _new_tag(self):
tag = self.tagpre + bytes(str(self.tagnum), self._encoding)
self.tagnum = self.tagnum + 1
self.tagged_commands[tag] = None
return tag
def _quote(self, arg):
arg = arg.replace('\\', '\\\\')
arg = arg.replace('"', '\\"')
return '"' + arg + '"'
def _simple_command(self, name, *args):
return self._command_complete(name, self._command(name, *args))
def _untagged_response(self, typ, dat, name):
if typ == 'NO':
return typ, dat
if not name in self.untagged_responses:
return typ, [None]
data = self.untagged_responses.pop(name)
if __debug__:
if self.debug >= 5:
self._mesg('untagged_responses[%s] => %s' % (name, data))
return typ, data
if __debug__:
def _mesg(self, s, secs=None):
if secs is None:
secs = time.time()
tm = time.strftime('%M:%S', time.localtime(secs))
sys.stderr.write(' %s.%02d %s\n' % (tm, (secs*100)%100, s))
sys.stderr.flush()
def _dump_ur(self, untagged_resp_dict):
if not untagged_resp_dict:
return
items = (f'{key}: {value!r}'
for key, value in untagged_resp_dict.items())
self._mesg('untagged responses dump:' + '\n\t\t'.join(items))
def _log(self, line):
self._cmd_log[self._cmd_log_idx] = (line, time.time())
self._cmd_log_idx += 1
if self._cmd_log_idx >= self._cmd_log_len:
self._cmd_log_idx = 0
def print_log(self):
self._mesg('last %d IMAP4 interactions:' % len(self._cmd_log))
i, n = self._cmd_log_idx, self._cmd_log_len
while n:
try:
self._mesg(*self._cmd_log[i])
except:
pass
i += 1
if i >= self._cmd_log_len:
i = 0
n -= 1
class Idler:
def __init__(self, imap, duration=None):
if 'IDLE' not in imap.capabilities:
raise imap.error("Server does not support IMAP4 IDLE")
if duration is not None and not imap.sock:
raise imap.error('duration requires a socket connection')
self._duration = duration
self._deadline = None
self._imap = imap
self._tag = None
self._saved_state = None
def __enter__(self):
imap = self._imap
assert not imap._idle_responses
assert not imap._idle_capture
if __debug__ and imap.debug >= 4:
imap._mesg(f'idle start duration={self._duration}')
imap._idle_capture = True
try:
self._tag = imap._command('IDLE')
while resp := imap._get_response():
if imap.tagged_commands[self._tag]:
typ, data = imap.tagged_commands.pop(self._tag)
if typ == 'NO':
raise imap.error(f'idle denied: {data}')
raise imap.abort(f'unexpected status response: {resp}')
if __debug__ and imap.debug >= 4:
prompt = imap.continuation_response
imap._mesg(f'idle continuation prompt: {prompt}')
except BaseException:
imap._idle_capture = False
raise
if self._duration is not None:
self._deadline = time.monotonic() + self._duration
self._saved_state = imap.state
imap.state = 'IDLING'
return self
def __exit__(self, exc_type, exc_val, exc_tb):
imap = self._imap
if __debug__ and imap.debug >= 4:
imap._mesg('idle done')
imap.state = self._saved_state
imap._idle_capture = False
if leftovers := len(imap._idle_responses):
if __debug__ and imap.debug >= 4:
imap._mesg(f'idle quit with {leftovers} leftover responses')
while imap._idle_responses:
typ, data = imap._idle_responses.pop(0)
for datum in data:
imap._append_untagged(typ, datum)
try:
imap.send(b'DONE' + CRLF)
status, [msg] = imap._command_complete('IDLE', self._tag)
if __debug__ and imap.debug >= 4:
imap._mesg(f'idle status: {status} {msg!r}')
except OSError:
if not exc_type:
raise
return False
def __iter__(self):
return self
def _pop(self, timeout, default=('', None)):
imap = self._imap
if imap.state != 'IDLING':
raise imap.error('_pop() only works during IDLE')
if imap._idle_responses:
resp = imap._idle_responses.pop(0)
if __debug__ and imap.debug >= 4:
imap._mesg(f'idle _pop({timeout}) de-queued {resp[0]}')
return resp
if __debug__ and imap.debug >= 4:
imap._mesg(f'idle _pop({timeout}) reading')
if timeout is not None:
if timeout <= 0:
return default
timeout = float(timeout)
try:
imap._get_response(timeout) except IMAP4._responsetimeout:
if __debug__ and imap.debug >= 4:
imap._mesg(f'idle _pop({timeout}) done')
return default
resp = imap._idle_responses.pop(0)
if __debug__ and imap.debug >= 4:
imap._mesg(f'idle _pop({timeout}) read {resp[0]}')
return resp
def __next__(self):
imap = self._imap
if self._duration is None:
timeout = None
else:
timeout = self._deadline - time.monotonic()
typ, data = self._pop(timeout)
if not typ:
if __debug__ and imap.debug >= 4:
imap._mesg('idle iterator exhausted')
raise StopIteration
return typ, data
def burst(self, interval=0.1):
if not self._imap.sock:
raise self._imap.error('burst() requires a socket connection')
try:
yield next(self)
except StopIteration:
return
while response := self._pop(interval, None):
yield response
if HAVE_SSL:
class IMAP4_SSL(IMAP4):
def __init__(self, host='', port=IMAP4_SSL_PORT,
*, ssl_context=None, timeout=None):
if ssl_context is None:
ssl_context = ssl._create_stdlib_context()
self.ssl_context = ssl_context
IMAP4.__init__(self, host, port, timeout)
def _create_socket(self, timeout):
sock = IMAP4._create_socket(self, timeout)
return self.ssl_context.wrap_socket(sock,
server_hostname=self.host)
def open(self, host='', port=IMAP4_SSL_PORT, timeout=None):
IMAP4.open(self, host, port, timeout)
__all__.append("IMAP4_SSL")
class IMAP4_stream(IMAP4):
def __init__(self, command):
self.command = command
IMAP4.__init__(self)
def open(self, host=None, port=None, timeout=None):
self.host = None self.port = None
self.sock = None
self._file = None
self.process = subprocess.Popen(self.command,
bufsize=DEFAULT_BUFFER_SIZE,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
shell=True, close_fds=True)
self.writefile = self.process.stdin
self.readfile = self.process.stdout
def read(self, size):
return self.readfile.read(size)
def readline(self):
return self.readfile.readline()
def send(self, data):
self.writefile.write(data)
self.writefile.flush()
def shutdown(self):
self.readfile.close()
self.writefile.close()
self.process.wait()
class _Authenticator:
def __init__(self, mechinst):
self.mech = mechinst
def process(self, data):
ret = self.mech(self.decode(data))
if ret is None:
return b'*' return self.encode(ret)
def encode(self, inp):
oup = b''
if isinstance(inp, str):
inp = inp.encode('utf-8')
while inp:
if len(inp) > 48:
t = inp[:48]
inp = inp[48:]
else:
t = inp
inp = b''
e = binascii.b2a_base64(t)
if e:
oup = oup + e[:-1]
return oup
def decode(self, inp):
if not inp:
return b''
return binascii.a2b_base64(inp)
Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
Mon2num = {s.encode():n+1 for n, s in enumerate(Months[1:])}
def Internaldate2tuple(resp):
mo = InternalDate.match(resp)
if not mo:
return None
mon = Mon2num[mo.group('mon')]
zonen = mo.group('zonen')
day = int(mo.group('day'))
year = int(mo.group('year'))
hour = int(mo.group('hour'))
min = int(mo.group('min'))
sec = int(mo.group('sec'))
zoneh = int(mo.group('zoneh'))
zonem = int(mo.group('zonem'))
zone = (zoneh*60 + zonem)*60
if zonen == b'-':
zone = -zone
tt = (year, mon, day, hour, min, sec, -1, -1, -1)
utc = calendar.timegm(tt) - zone
return time.localtime(utc)
def Int2AP(num):
val = b''; AP = b'ABCDEFGHIJKLMNOP'
num = int(abs(num))
while num:
num, mod = divmod(num, 16)
val = AP[mod:mod+1] + val
return val
def ParseFlags(resp):
mo = Flags.match(resp)
if not mo:
return ()
return tuple(mo.group('flags').split())
def Time2Internaldate(date_time):
if isinstance(date_time, (int, float)):
dt = datetime.fromtimestamp(date_time,
timezone.utc).astimezone()
elif isinstance(date_time, tuple):
try:
gmtoff = date_time.tm_gmtoff
except AttributeError:
if time.daylight:
dst = date_time[8]
if dst == -1:
dst = time.localtime(time.mktime(date_time))[8]
gmtoff = -(time.timezone, time.altzone)[dst]
else:
gmtoff = -time.timezone
delta = timedelta(seconds=gmtoff)
dt = datetime(*date_time[:6], tzinfo=timezone(delta))
elif isinstance(date_time, datetime):
if date_time.tzinfo is None:
raise ValueError("date_time must be aware")
dt = date_time
elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'):
return date_time else:
raise ValueError("date_time not of a known type")
fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(Months[dt.month])
return dt.strftime(fmt)
if __name__ == '__main__':
import getopt, getpass
try:
optlist, args = getopt.getopt(sys.argv[1:], 'd:s:')
except getopt.error as val:
optlist, args = (), ()
stream_command = None
for opt,val in optlist:
if opt == '-d':
Debug = int(val)
elif opt == '-s':
stream_command = val
if not args: args = (stream_command,)
if not args: args = ('',)
host = args[0]
USER = getpass.getuser()
PASSWD = getpass.getpass("IMAP password for %s on %s: " % (USER, host or "localhost"))
test_mesg = 'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {'user':USER, 'lf':'\n'}
test_seq1 = (
('login', (USER, PASSWD)),
('create', ('/tmp/xxx 1',)),
('rename', ('/tmp/xxx 1', '/tmp/yyy')),
('CREATE', ('/tmp/yyz 2',)),
('append', ('/tmp/yyz 2', None, None, test_mesg)),
('list', ('/tmp', 'yy*')),
('select', ('/tmp/yyz 2',)),
('search', (None, 'SUBJECT', 'test')),
('fetch', ('1', '(FLAGS INTERNALDATE RFC822)')),
('store', ('1', 'FLAGS', r'(\Deleted)')),
('namespace', ()),
('expunge', ()),
('recent', ()),
('close', ()),
)
test_seq2 = (
('select', ()),
('response',('UIDVALIDITY',)),
('uid', ('SEARCH', 'ALL')),
('response', ('EXISTS',)),
('append', (None, None, None, test_mesg)),
('recent', ()),
('logout', ()),
)
def run(cmd, args):
M._mesg('%s %s' % (cmd, args))
typ, dat = getattr(M, cmd)(*args)
M._mesg('%s => %s %s' % (cmd, typ, dat))
if typ == 'NO': raise dat[0]
return dat
try:
if stream_command:
M = IMAP4_stream(stream_command)
else:
M = IMAP4(host)
if M.state == 'AUTH':
test_seq1 = test_seq1[1:] M._mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)
M._mesg('CAPABILITIES = %r' % (M.capabilities,))
for cmd,args in test_seq1:
run(cmd, args)
for ml in run('list', ('/tmp/', 'yy%')):
mo = re.match(r'.*"([^"]+)"$', ml)
if mo: path = mo.group(1)
else: path = ml.split()[-1]
run('delete', (path,))
for cmd,args in test_seq2:
dat = run(cmd, args)
if (cmd,args) != ('uid', ('SEARCH', 'ALL')):
continue
uid = dat[-1].split()
if not uid: continue
run('uid', ('FETCH', '%s' % uid[-1],
'(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)'))
print('\nAll tests OK.')
except:
print('\nTests failed.')
if not Debug:
print('''
If you would like to see debugging output,
try: %s -d5
''' % sys.argv[0])
raise